C0urante commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1598401120


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) {
         return current.get(lastStep());
     }
 
+    /**
+     * Access {@code Map} fields and apply functions to update field values.
+     *
+     * @param originalValue schema-based data value
+     * @param whenFound     function to apply when path is found
+     * @param whenNotFound  function to apply when path is not found
+     * @param whenOther     function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Map<String, Object> updateValueFrom(
+        Map<String, Object> originalValue,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> updateValue(
+        Map<String, Object> originalValue,
+        int step,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        if (originalValue == null) return null;
+        Map<String, Object> updatedParent = new 
HashMap<>(originalValue.size());
+        boolean found = false;
+        for (Map.Entry<String, Object> entry : originalValue.entrySet()) {
+            String fieldName = entry.getKey();
+            Object fieldValue = entry.getValue();
+            if (steps.get(step).equals(fieldName)) {
+                found = true;
+                if (step < lastStepIndex()) {
+                    if (fieldValue instanceof Map) {
+                        Map<String, Object> updatedField = updateValue(
+                            (Map<String, Object>) fieldValue,
+                            step + 1,
+                            whenFound,
+                            whenNotFound,
+                            whenOther);
+                        updatedParent.put(fieldName, updatedField);
+                    } else {
+                        // add back to not found and apply others, as only 
leaf values are updated
+                        found = false;
+                        whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+                    }
+                } else {
+                    whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+                }
+            } else {
+                whenOther.apply(originalValue, updatedParent, null, fieldName);
+            }
+        }
+
+        if (!found) {
+            whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+        }
+
+        return updatedParent;
+    }
+
+    /**
+     * Access {@code Struct} fields and apply functions to update field values.
+     *
+     * @param originalSchema original struct schema
+     * @param originalValue  schema-based data value
+     * @param updatedSchema  updated struct schema
+     * @param whenFound      function to apply when path is found
+     * @param whenNotFound   function to apply when path is not found
+     * @param whenOther      function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Struct updateValueFrom(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updatedSchema,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+    }
+
+    private Struct updateValue(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updateSchema,
+        int step,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        Struct updated = new Struct(updateSchema);
+        boolean found = false;
+        for (Field field : originalSchema.fields()) {
+            if (step < steps.size()) {
+                if (steps.get(step).equals(field.name())) {
+                    found = true;
+                    if (step == lastStepIndex()) {
+                        whenFound.apply(
+                            originalValue,
+                            field,
+                            updated,
+                            updateSchema.field(field.name()),
+                            this
+                        );
+                    } else {
+                        if (field.schema().type() == Schema.Type.STRUCT) {
+                            Struct fieldValue = updateValue(
+                                field.schema(),
+                                originalValue.getStruct(field.name()),
+                                updateSchema.field(field.name()).schema(),
+                                step + 1,
+                                whenFound,
+                                whenNotFound,
+                                whenOther
+                            );
+                            updated.put(field.name(), fieldValue);
+                        } else {
+                            // add back to not found and apply others, as only 
leaf values are updated
+                            found = false;
+                            whenOther.apply(originalValue, field, updated, 
null, this);
+                        }
+                    }
+                } else {
+                    whenOther.apply(originalValue, field, updated, null, this);
+                }
+            }
+        }
+        if (!found) {
+            whenNotFound.apply(
+                originalValue,
+                null,
+                updated,
+                updateSchema.field(steps.get(step)),
+                this);
+        }
+        return updated;
+    }

Review Comment:
   Just for the sake of the `TimestampConverter` SMT, I don't think we need all 
this complexity. We could get everything we need without custom interfaces and 
with fewer arguments by doing something like `public Struct updateStruct(Struct 
original, Schema updatedSchema, BiFunction<Object, Schema, Object> 
updateField)` (where the last argument accepts the original field and original 
schema, and returns the new field).
   
   I've sketched out an implementation but feel free to do your own if you 
prefer:
   ```java
   public Struct updateStruct(
           Struct struct,
           Schema updatedSchema,
           BiFunction<Object, Schema, Object> update
   ) {
       return updateStruct(
               struct,
               updatedSchema,
               update,
               steps.get(0),
               steps.subList(1, steps.size())
       );
   }
   
   private static Struct updateStruct(
           Struct original,
           Schema updatedSchema,
           BiFunction<Object, Schema, Object> update,
           String currentStep,
           List<String> nextSteps
   ) {
       if (original == null)
           return null;
   
       Struct result = new Struct(updatedSchema);
       for (Field field : updatedSchema.fields()) {
           String fieldName = field.name();
   
           if (fieldName.equals(currentStep)) {
               final Object updatedField;
   
               // Modify this field
               if (nextSteps.isEmpty()) {
                   // This is a leaf node
                   Object originalField = original.get(fieldName);
                   updatedField = update.apply(
                           originalField,
                           original.schema().field(fieldName).schema()
                   );
               } else {
                   // We have to go deeper
                   Struct originalField = 
requireStructOrNull(original.get(fieldName), "nested field access");
                   updatedField = updateStruct(
                           originalField,
                           field.schema(),
                           update,
                           nextSteps.get(0),
                           nextSteps.subList(1, nextSteps.size())
                   );
               }
   
               result.put(fieldName, updatedField);
           } else {
               // Copy over all other fields from the original to the result
               result.put(fieldName, original.get(fieldName));
           }
       }
   
       return result;
   }
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) {
         return current.get(lastStep());
     }
 
+    /**
+     * Access {@code Map} fields and apply functions to update field values.
+     *
+     * @param originalValue schema-based data value
+     * @param whenFound     function to apply when path is found
+     * @param whenNotFound  function to apply when path is not found
+     * @param whenOther     function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Map<String, Object> updateValueFrom(
+        Map<String, Object> originalValue,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> updateValue(
+        Map<String, Object> originalValue,
+        int step,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        if (originalValue == null) return null;
+        Map<String, Object> updatedParent = new 
HashMap<>(originalValue.size());
+        boolean found = false;
+        for (Map.Entry<String, Object> entry : originalValue.entrySet()) {
+            String fieldName = entry.getKey();
+            Object fieldValue = entry.getValue();
+            if (steps.get(step).equals(fieldName)) {
+                found = true;
+                if (step < lastStepIndex()) {
+                    if (fieldValue instanceof Map) {
+                        Map<String, Object> updatedField = updateValue(
+                            (Map<String, Object>) fieldValue,
+                            step + 1,
+                            whenFound,
+                            whenNotFound,
+                            whenOther);
+                        updatedParent.put(fieldName, updatedField);
+                    } else {
+                        // add back to not found and apply others, as only 
leaf values are updated
+                        found = false;
+                        whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+                    }
+                } else {
+                    whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+                }
+            } else {
+                whenOther.apply(originalValue, updatedParent, null, fieldName);
+            }
+        }
+
+        if (!found) {
+            whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+        }
+
+        return updatedParent;
+    }
+
+    /**
+     * Access {@code Struct} fields and apply functions to update field values.
+     *
+     * @param originalSchema original struct schema
+     * @param originalValue  schema-based data value
+     * @param updatedSchema  updated struct schema
+     * @param whenFound      function to apply when path is found
+     * @param whenNotFound   function to apply when path is not found
+     * @param whenOther      function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Struct updateValueFrom(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updatedSchema,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+    }
+
+    private Struct updateValue(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updateSchema,
+        int step,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        Struct updated = new Struct(updateSchema);
+        boolean found = false;
+        for (Field field : originalSchema.fields()) {
+            if (step < steps.size()) {
+                if (steps.get(step).equals(field.name())) {
+                    found = true;
+                    if (step == lastStepIndex()) {
+                        whenFound.apply(
+                            originalValue,
+                            field,
+                            updated,
+                            updateSchema.field(field.name()),
+                            this
+                        );
+                    } else {
+                        if (field.schema().type() == Schema.Type.STRUCT) {
+                            Struct fieldValue = updateValue(
+                                field.schema(),
+                                originalValue.getStruct(field.name()),
+                                updateSchema.field(field.name()).schema(),
+                                step + 1,
+                                whenFound,
+                                whenNotFound,
+                                whenOther
+                            );
+                            updated.put(field.name(), fieldValue);
+                        } else {
+                            // add back to not found and apply others, as only 
leaf values are updated
+                            found = false;
+                            whenOther.apply(originalValue, field, updated, 
null, this);
+                        }
+                    }
+                } else {
+                    whenOther.apply(originalValue, field, updated, null, this);
+                }
+            }
+        }
+        if (!found) {
+            whenNotFound.apply(
+                originalValue,
+                null,
+                updated,
+                updateSchema.field(steps.get(step)),
+                this);
+        }
+        return updated;
+    }
+
+    /**
+     * Prepares a new schema based on an original one, and applies an update 
function
+     * when the current path(s) is found.
+     *
+     * <p>If path is not found, no function is applied, and the path is 
ignored.
+     *
+     * <p>Other fields are copied from original schema.
+     *
+     * @param originalSchema        baseline schema
+     * @param baselineSchemaBuilder baseline schema build, if changes to the 
baseline
+     *                              are required before copying original
+     * @param whenFound             function to apply when current path(s) 
is/are found.
+     * @return an updated schema. Resulting schemas are usually cached for 
further access.
+     */
+    public Schema updateSchemaFrom(
+        Schema originalSchema,
+        SchemaBuilder baselineSchemaBuilder,
+        StructSchemaUpdater whenFound,
+        StructSchemaUpdater whenNotFound,
+        StructSchemaUpdater whenOther
+    ) {
+        return updateSchema(originalSchema, baselineSchemaBuilder, 0, 
whenFound, whenNotFound, whenOther);
+    }

Review Comment:
   Similar thoughts on the method signature here: we don't need custom 
interfaces (I think something like a `Function<Schema, Schema>` would suffice), 
and we don't need the `whenNotFound` and `whenOther` arguments (IMO `whenFound` 
can be named something simpler like `update`).



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) {
         return current.get(lastStep());
     }
 
+    /**
+     * Access {@code Map} fields and apply functions to update field values.
+     *
+     * @param originalValue schema-based data value
+     * @param whenFound     function to apply when path is found
+     * @param whenNotFound  function to apply when path is not found
+     * @param whenOther     function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Map<String, Object> updateValueFrom(
+        Map<String, Object> originalValue,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> updateValue(
+        Map<String, Object> originalValue,
+        int step,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        if (originalValue == null) return null;
+        Map<String, Object> updatedParent = new 
HashMap<>(originalValue.size());
+        boolean found = false;
+        for (Map.Entry<String, Object> entry : originalValue.entrySet()) {
+            String fieldName = entry.getKey();
+            Object fieldValue = entry.getValue();
+            if (steps.get(step).equals(fieldName)) {
+                found = true;
+                if (step < lastStepIndex()) {
+                    if (fieldValue instanceof Map) {
+                        Map<String, Object> updatedField = updateValue(
+                            (Map<String, Object>) fieldValue,
+                            step + 1,
+                            whenFound,
+                            whenNotFound,
+                            whenOther);
+                        updatedParent.put(fieldName, updatedField);
+                    } else {
+                        // add back to not found and apply others, as only 
leaf values are updated
+                        found = false;
+                        whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+                    }
+                } else {
+                    whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+                }
+            } else {
+                whenOther.apply(originalValue, updatedParent, null, fieldName);
+            }
+        }
+
+        if (!found) {
+            whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+        }
+
+        return updatedParent;
+    }

Review Comment:
   This seems a bit complex for the `TimestampConverter` use case. It seems 
like we could get everything we wanted with a simpler, less-flexible approach:
   
   ```java
       public Map<String, Object> updateMap(
               Map<String, Object> map,
               Function<Object, Object> update
       ) {
           if (map == null) return null;
           Map<String, Object> result = new HashMap<>(map);
   
           Map<String, Object> parent = result;
           Map<String, Object> child;
           for (String step : stepsWithoutLast()) {
               child = requireMapOrNull(parent.get(step), "nested field 
access");
               if (child == null) return null;
               child = new HashMap<>(child);
               parent.put(step, child);
               parent = child;
           }
   
           Object original = parent.get(lastStep());
           if (original == null) return null;
           Object updated = update.apply(original);
           parent.put(lastStep(), updated);
           return result;
       }
   ```
   
   which could be used in `TimestampConverter` like this:
   
   ```java
       private R applySchemaless(R record) {
           Object rawValue = operatingValue(record);
           if (rawValue == null || config.field.isEmpty()) {
               return newRecord(record, null, convertTimestamp(rawValue));
           } else {
               final Map<String, Object> value = requireMap(rawValue, PURPOSE);
               final Map<String, Object> updatedValue = config.field.updateMap(
                       value,
                       this::convertTimestamp
               );
   
               if (updatedValue == null) {
                   return newRecord(record, null, value);
               } else {
                   return newRecord(record, null, updatedValue);
               }
           }
       }
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) {
         return current.get(lastStep());
     }
 
+    /**
+     * Access {@code Map} fields and apply functions to update field values.
+     *
+     * @param originalValue schema-based data value
+     * @param whenFound     function to apply when path is found
+     * @param whenNotFound  function to apply when path is not found
+     * @param whenOther     function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Map<String, Object> updateValueFrom(
+        Map<String, Object> originalValue,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> updateValue(
+        Map<String, Object> originalValue,
+        int step,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        if (originalValue == null) return null;
+        Map<String, Object> updatedParent = new 
HashMap<>(originalValue.size());
+        boolean found = false;
+        for (Map.Entry<String, Object> entry : originalValue.entrySet()) {
+            String fieldName = entry.getKey();
+            Object fieldValue = entry.getValue();
+            if (steps.get(step).equals(fieldName)) {
+                found = true;
+                if (step < lastStepIndex()) {
+                    if (fieldValue instanceof Map) {
+                        Map<String, Object> updatedField = updateValue(
+                            (Map<String, Object>) fieldValue,
+                            step + 1,
+                            whenFound,
+                            whenNotFound,
+                            whenOther);
+                        updatedParent.put(fieldName, updatedField);
+                    } else {
+                        // add back to not found and apply others, as only 
leaf values are updated
+                        found = false;
+                        whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+                    }
+                } else {
+                    whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+                }
+            } else {
+                whenOther.apply(originalValue, updatedParent, null, fieldName);
+            }
+        }
+
+        if (!found) {
+            whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+        }
+
+        return updatedParent;
+    }
+
+    /**
+     * Access {@code Struct} fields and apply functions to update field values.
+     *
+     * @param originalSchema original struct schema
+     * @param originalValue  schema-based data value
+     * @param updatedSchema  updated struct schema
+     * @param whenFound      function to apply when path is found
+     * @param whenNotFound   function to apply when path is not found
+     * @param whenOther      function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Struct updateValueFrom(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updatedSchema,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+    }
+
+    private Struct updateValue(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updateSchema,
+        int step,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        Struct updated = new Struct(updateSchema);
+        boolean found = false;
+        for (Field field : originalSchema.fields()) {
+            if (step < steps.size()) {
+                if (steps.get(step).equals(field.name())) {
+                    found = true;
+                    if (step == lastStepIndex()) {
+                        whenFound.apply(
+                            originalValue,
+                            field,
+                            updated,
+                            updateSchema.field(field.name()),
+                            this
+                        );
+                    } else {
+                        if (field.schema().type() == Schema.Type.STRUCT) {
+                            Struct fieldValue = updateValue(
+                                field.schema(),
+                                originalValue.getStruct(field.name()),
+                                updateSchema.field(field.name()).schema(),
+                                step + 1,
+                                whenFound,
+                                whenNotFound,
+                                whenOther
+                            );
+                            updated.put(field.name(), fieldValue);
+                        } else {
+                            // add back to not found and apply others, as only 
leaf values are updated
+                            found = false;
+                            whenOther.apply(originalValue, field, updated, 
null, this);
+                        }
+                    }
+                } else {
+                    whenOther.apply(originalValue, field, updated, null, this);
+                }
+            }
+        }
+        if (!found) {
+            whenNotFound.apply(
+                originalValue,
+                null,
+                updated,
+                updateSchema.field(steps.get(step)),
+                this);
+        }
+        return updated;
+    }
+
+    /**
+     * Prepares a new schema based on an original one, and applies an update 
function
+     * when the current path(s) is found.
+     *
+     * <p>If path is not found, no function is applied, and the path is 
ignored.
+     *
+     * <p>Other fields are copied from original schema.
+     *
+     * @param originalSchema        baseline schema
+     * @param baselineSchemaBuilder baseline schema build, if changes to the 
baseline
+     *                              are required before copying original
+     * @param whenFound             function to apply when current path(s) 
is/are found.
+     * @return an updated schema. Resulting schemas are usually cached for 
further access.
+     */
+    public Schema updateSchemaFrom(
+        Schema originalSchema,
+        SchemaBuilder baselineSchemaBuilder,
+        StructSchemaUpdater whenFound,
+        StructSchemaUpdater whenNotFound,
+        StructSchemaUpdater whenOther
+    ) {
+        return updateSchema(originalSchema, baselineSchemaBuilder, 0, 
whenFound, whenNotFound, whenOther);
+    }
+
+    // Recursive implementation to update schema at different steps.
+    // Consider that resulting schemas are usually cached.
+    private Schema updateSchema(
+        Schema operatingSchema,
+        SchemaBuilder builder,
+        int step,
+        StructSchemaUpdater matching,
+        StructSchemaUpdater notFound,
+        StructSchemaUpdater others
+    ) {
+        if (operatingSchema.isOptional()) {
+            builder.optional();
+        }
+        if (operatingSchema.defaultValue() != null) {
+            builder.defaultValue(operatingSchema.defaultValue());
+        }

Review Comment:
   Is this correct? What if the schema we're modifying is for a `Struct`? 
Wouldn't this cause the new schema to have a default `Struct` value that uses 
the old schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to