C0urante commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1598401120
########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) { return current.get(lastStep()); } + /** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Map<String, Object> updateValueFrom( + Map<String, Object> originalValue, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); + } + + @SuppressWarnings("unchecked") + private Map<String, Object> updateValue( + Map<String, Object> originalValue, + int step, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + if (originalValue == null) return null; + Map<String, Object> updatedParent = new HashMap<>(originalValue.size()); + boolean found = false; + for (Map.Entry<String, Object> entry : originalValue.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue(); + if (steps.get(step).equals(fieldName)) { + found = true; + if (step < lastStepIndex()) { + if (fieldValue instanceof Map) { + Map<String, Object> updatedField = updateValue( + (Map<String, Object>) fieldValue, + step + 1, + whenFound, + whenNotFound, + whenOther); + updatedParent.put(fieldName, updatedField); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } else { + whenFound.apply(originalValue, updatedParent, this, fieldName); + } + } else { + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } + + if (!found) { + whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); + } + + return updatedParent; + } + + /** + * Access {@code Struct} fields and apply functions to update field values. + * + * @param originalSchema original struct schema + * @param originalValue schema-based data value + * @param updatedSchema updated struct schema + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Struct updateValueFrom( + Schema originalSchema, + Struct originalValue, + Schema updatedSchema, + StructValueUpdater whenFound, + StructValueUpdater whenNotFound, + StructValueUpdater whenOther + ) { + return updateValue(originalSchema, originalValue, updatedSchema, 0, whenFound, whenNotFound, whenOther); + } + + private Struct updateValue( + Schema originalSchema, + Struct originalValue, + Schema updateSchema, + int step, + StructValueUpdater whenFound, + StructValueUpdater whenNotFound, + StructValueUpdater whenOther + ) { + Struct updated = new Struct(updateSchema); + boolean found = false; + for (Field field : originalSchema.fields()) { + if (step < steps.size()) { + if (steps.get(step).equals(field.name())) { + found = true; + if (step == lastStepIndex()) { + whenFound.apply( + originalValue, + field, + updated, + updateSchema.field(field.name()), + this + ); + } else { + if (field.schema().type() == Schema.Type.STRUCT) { + Struct fieldValue = updateValue( + field.schema(), + originalValue.getStruct(field.name()), + updateSchema.field(field.name()).schema(), + step + 1, + whenFound, + whenNotFound, + whenOther + ); + updated.put(field.name(), fieldValue); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, field, updated, null, this); + } + } + } else { + whenOther.apply(originalValue, field, updated, null, this); + } + } + } + if (!found) { + whenNotFound.apply( + originalValue, + null, + updated, + updateSchema.field(steps.get(step)), + this); + } + return updated; + } Review Comment: Just for the sake of the `TimestampConverter` SMT, I don't think we need all this complexity. We could get everything we need without custom interfaces and with fewer arguments by doing something like `public Struct updateStruct(Struct original, Schema updatedSchema, BiFunction<Object, Schema, Object> updateField)` (where the last argument accepts the original field and original schema, and returns the new field). I've sketched out an implementation but feel free to do your own if you prefer: ```java public Struct updateStruct( Struct struct, Schema updatedSchema, BiFunction<Object, Schema, Object> update ) { return updateStruct( struct, updatedSchema, update, steps.get(0), steps.subList(1, steps.size()) ); } private static Struct updateStruct( Struct original, Schema updatedSchema, BiFunction<Object, Schema, Object> update, String currentStep, List<String> nextSteps ) { if (original == null) return null; Struct result = new Struct(updatedSchema); for (Field field : updatedSchema.fields()) { String fieldName = field.name(); if (fieldName.equals(currentStep)) { final Object updatedField; // Modify this field if (nextSteps.isEmpty()) { // This is a leaf node Object originalField = original.get(fieldName); updatedField = update.apply( originalField, original.schema().field(fieldName).schema() ); } else { // We have to go deeper Struct originalField = requireStructOrNull(original.get(fieldName), "nested field access"); updatedField = updateStruct( originalField, field.schema(), update, nextSteps.get(0), nextSteps.subList(1, nextSteps.size()) ); } result.put(fieldName, updatedField); } else { // Copy over all other fields from the original to the result result.put(fieldName, original.get(fieldName)); } } return result; } ``` ########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) { return current.get(lastStep()); } + /** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Map<String, Object> updateValueFrom( + Map<String, Object> originalValue, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); + } + + @SuppressWarnings("unchecked") + private Map<String, Object> updateValue( + Map<String, Object> originalValue, + int step, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + if (originalValue == null) return null; + Map<String, Object> updatedParent = new HashMap<>(originalValue.size()); + boolean found = false; + for (Map.Entry<String, Object> entry : originalValue.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue(); + if (steps.get(step).equals(fieldName)) { + found = true; + if (step < lastStepIndex()) { + if (fieldValue instanceof Map) { + Map<String, Object> updatedField = updateValue( + (Map<String, Object>) fieldValue, + step + 1, + whenFound, + whenNotFound, + whenOther); + updatedParent.put(fieldName, updatedField); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } else { + whenFound.apply(originalValue, updatedParent, this, fieldName); + } + } else { + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } + + if (!found) { + whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); + } + + return updatedParent; + } + + /** + * Access {@code Struct} fields and apply functions to update field values. + * + * @param originalSchema original struct schema + * @param originalValue schema-based data value + * @param updatedSchema updated struct schema + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Struct updateValueFrom( + Schema originalSchema, + Struct originalValue, + Schema updatedSchema, + StructValueUpdater whenFound, + StructValueUpdater whenNotFound, + StructValueUpdater whenOther + ) { + return updateValue(originalSchema, originalValue, updatedSchema, 0, whenFound, whenNotFound, whenOther); + } + + private Struct updateValue( + Schema originalSchema, + Struct originalValue, + Schema updateSchema, + int step, + StructValueUpdater whenFound, + StructValueUpdater whenNotFound, + StructValueUpdater whenOther + ) { + Struct updated = new Struct(updateSchema); + boolean found = false; + for (Field field : originalSchema.fields()) { + if (step < steps.size()) { + if (steps.get(step).equals(field.name())) { + found = true; + if (step == lastStepIndex()) { + whenFound.apply( + originalValue, + field, + updated, + updateSchema.field(field.name()), + this + ); + } else { + if (field.schema().type() == Schema.Type.STRUCT) { + Struct fieldValue = updateValue( + field.schema(), + originalValue.getStruct(field.name()), + updateSchema.field(field.name()).schema(), + step + 1, + whenFound, + whenNotFound, + whenOther + ); + updated.put(field.name(), fieldValue); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, field, updated, null, this); + } + } + } else { + whenOther.apply(originalValue, field, updated, null, this); + } + } + } + if (!found) { + whenNotFound.apply( + originalValue, + null, + updated, + updateSchema.field(steps.get(step)), + this); + } + return updated; + } + + /** + * Prepares a new schema based on an original one, and applies an update function + * when the current path(s) is found. + * + * <p>If path is not found, no function is applied, and the path is ignored. + * + * <p>Other fields are copied from original schema. + * + * @param originalSchema baseline schema + * @param baselineSchemaBuilder baseline schema build, if changes to the baseline + * are required before copying original + * @param whenFound function to apply when current path(s) is/are found. + * @return an updated schema. Resulting schemas are usually cached for further access. + */ + public Schema updateSchemaFrom( + Schema originalSchema, + SchemaBuilder baselineSchemaBuilder, + StructSchemaUpdater whenFound, + StructSchemaUpdater whenNotFound, + StructSchemaUpdater whenOther + ) { + return updateSchema(originalSchema, baselineSchemaBuilder, 0, whenFound, whenNotFound, whenOther); + } Review Comment: Similar thoughts on the method signature here: we don't need custom interfaces (I think something like a `Function<Schema, Schema>` would suffice), and we don't need the `whenNotFound` and `whenOther` arguments (IMO `whenFound` can be named something simpler like `update`). ########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) { return current.get(lastStep()); } + /** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Map<String, Object> updateValueFrom( + Map<String, Object> originalValue, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); + } + + @SuppressWarnings("unchecked") + private Map<String, Object> updateValue( + Map<String, Object> originalValue, + int step, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + if (originalValue == null) return null; + Map<String, Object> updatedParent = new HashMap<>(originalValue.size()); + boolean found = false; + for (Map.Entry<String, Object> entry : originalValue.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue(); + if (steps.get(step).equals(fieldName)) { + found = true; + if (step < lastStepIndex()) { + if (fieldValue instanceof Map) { + Map<String, Object> updatedField = updateValue( + (Map<String, Object>) fieldValue, + step + 1, + whenFound, + whenNotFound, + whenOther); + updatedParent.put(fieldName, updatedField); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } else { + whenFound.apply(originalValue, updatedParent, this, fieldName); + } + } else { + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } + + if (!found) { + whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); + } + + return updatedParent; + } Review Comment: This seems a bit complex for the `TimestampConverter` use case. It seems like we could get everything we wanted with a simpler, less-flexible approach: ```java public Map<String, Object> updateMap( Map<String, Object> map, Function<Object, Object> update ) { if (map == null) return null; Map<String, Object> result = new HashMap<>(map); Map<String, Object> parent = result; Map<String, Object> child; for (String step : stepsWithoutLast()) { child = requireMapOrNull(parent.get(step), "nested field access"); if (child == null) return null; child = new HashMap<>(child); parent.put(step, child); parent = child; } Object original = parent.get(lastStep()); if (original == null) return null; Object updated = update.apply(original); parent.put(lastStep(), updated); return result; } ``` which could be used in `TimestampConverter` like this: ```java private R applySchemaless(R record) { Object rawValue = operatingValue(record); if (rawValue == null || config.field.isEmpty()) { return newRecord(record, null, convertTimestamp(rawValue)); } else { final Map<String, Object> value = requireMap(rawValue, PURPOSE); final Map<String, Object> updatedValue = config.field.updateMap( value, this::convertTimestamp ); if (updatedValue == null) { return newRecord(record, null, value); } else { return newRecord(record, null, updatedValue); } } } ``` ########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) { return current.get(lastStep()); } + /** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Map<String, Object> updateValueFrom( + Map<String, Object> originalValue, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); + } + + @SuppressWarnings("unchecked") + private Map<String, Object> updateValue( + Map<String, Object> originalValue, + int step, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + if (originalValue == null) return null; + Map<String, Object> updatedParent = new HashMap<>(originalValue.size()); + boolean found = false; + for (Map.Entry<String, Object> entry : originalValue.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue(); + if (steps.get(step).equals(fieldName)) { + found = true; + if (step < lastStepIndex()) { + if (fieldValue instanceof Map) { + Map<String, Object> updatedField = updateValue( + (Map<String, Object>) fieldValue, + step + 1, + whenFound, + whenNotFound, + whenOther); + updatedParent.put(fieldName, updatedField); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } else { + whenFound.apply(originalValue, updatedParent, this, fieldName); + } + } else { + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } + + if (!found) { + whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); + } + + return updatedParent; + } + + /** + * Access {@code Struct} fields and apply functions to update field values. + * + * @param originalSchema original struct schema + * @param originalValue schema-based data value + * @param updatedSchema updated struct schema + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Struct updateValueFrom( + Schema originalSchema, + Struct originalValue, + Schema updatedSchema, + StructValueUpdater whenFound, + StructValueUpdater whenNotFound, + StructValueUpdater whenOther + ) { + return updateValue(originalSchema, originalValue, updatedSchema, 0, whenFound, whenNotFound, whenOther); + } + + private Struct updateValue( + Schema originalSchema, + Struct originalValue, + Schema updateSchema, + int step, + StructValueUpdater whenFound, + StructValueUpdater whenNotFound, + StructValueUpdater whenOther + ) { + Struct updated = new Struct(updateSchema); + boolean found = false; + for (Field field : originalSchema.fields()) { + if (step < steps.size()) { + if (steps.get(step).equals(field.name())) { + found = true; + if (step == lastStepIndex()) { + whenFound.apply( + originalValue, + field, + updated, + updateSchema.field(field.name()), + this + ); + } else { + if (field.schema().type() == Schema.Type.STRUCT) { + Struct fieldValue = updateValue( + field.schema(), + originalValue.getStruct(field.name()), + updateSchema.field(field.name()).schema(), + step + 1, + whenFound, + whenNotFound, + whenOther + ); + updated.put(field.name(), fieldValue); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, field, updated, null, this); + } + } + } else { + whenOther.apply(originalValue, field, updated, null, this); + } + } + } + if (!found) { + whenNotFound.apply( + originalValue, + null, + updated, + updateSchema.field(steps.get(step)), + this); + } + return updated; + } + + /** + * Prepares a new schema based on an original one, and applies an update function + * when the current path(s) is found. + * + * <p>If path is not found, no function is applied, and the path is ignored. + * + * <p>Other fields are copied from original schema. + * + * @param originalSchema baseline schema + * @param baselineSchemaBuilder baseline schema build, if changes to the baseline + * are required before copying original + * @param whenFound function to apply when current path(s) is/are found. + * @return an updated schema. Resulting schemas are usually cached for further access. + */ + public Schema updateSchemaFrom( + Schema originalSchema, + SchemaBuilder baselineSchemaBuilder, + StructSchemaUpdater whenFound, + StructSchemaUpdater whenNotFound, + StructSchemaUpdater whenOther + ) { + return updateSchema(originalSchema, baselineSchemaBuilder, 0, whenFound, whenNotFound, whenOther); + } + + // Recursive implementation to update schema at different steps. + // Consider that resulting schemas are usually cached. + private Schema updateSchema( + Schema operatingSchema, + SchemaBuilder builder, + int step, + StructSchemaUpdater matching, + StructSchemaUpdater notFound, + StructSchemaUpdater others + ) { + if (operatingSchema.isOptional()) { + builder.optional(); + } + if (operatingSchema.defaultValue() != null) { + builder.defaultValue(operatingSchema.defaultValue()); + } Review Comment: Is this correct? What if the schema we're modifying is for a `Struct`? Wouldn't this cause the new schema to have a default `Struct` value that uses the old schema? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org