jeqo commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1612459168
########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) { return current.get(lastStep()); } + /** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Map<String, Object> updateValueFrom( + Map<String, Object> originalValue, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); + } + + @SuppressWarnings("unchecked") + private Map<String, Object> updateValue( + Map<String, Object> originalValue, + int step, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + if (originalValue == null) return null; + Map<String, Object> updatedParent = new HashMap<>(originalValue.size()); + boolean found = false; + for (Map.Entry<String, Object> entry : originalValue.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue(); + if (steps.get(step).equals(fieldName)) { + found = true; + if (step < lastStepIndex()) { + if (fieldValue instanceof Map) { + Map<String, Object> updatedField = updateValue( + (Map<String, Object>) fieldValue, + step + 1, + whenFound, + whenNotFound, + whenOther); + updatedParent.put(fieldName, updatedField); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } else { + whenFound.apply(originalValue, updatedParent, this, fieldName); + } + } else { + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } + + if (!found) { + whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); + } + + return updatedParent; + } + + /** + * Access {@code Struct} fields and apply functions to update field values. + * + * @param originalSchema original struct schema + * @param originalValue schema-based data value + * @param updatedSchema updated struct schema + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Struct updateValueFrom( + Schema originalSchema, + Struct originalValue, + Schema updatedSchema, + StructValueUpdater whenFound, + StructValueUpdater whenNotFound, + StructValueUpdater whenOther + ) { + return updateValue(originalSchema, originalValue, updatedSchema, 0, whenFound, whenNotFound, whenOther); + } + + private Struct updateValue( + Schema originalSchema, + Struct originalValue, + Schema updateSchema, + int step, + StructValueUpdater whenFound, + StructValueUpdater whenNotFound, + StructValueUpdater whenOther + ) { + Struct updated = new Struct(updateSchema); + boolean found = false; + for (Field field : originalSchema.fields()) { + if (step < steps.size()) { + if (steps.get(step).equals(field.name())) { + found = true; + if (step == lastStepIndex()) { + whenFound.apply( + originalValue, + field, + updated, + updateSchema.field(field.name()), + this + ); + } else { + if (field.schema().type() == Schema.Type.STRUCT) { + Struct fieldValue = updateValue( + field.schema(), + originalValue.getStruct(field.name()), + updateSchema.field(field.name()).schema(), + step + 1, + whenFound, + whenNotFound, + whenOther + ); + updated.put(field.name(), fieldValue); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, field, updated, null, this); + } + } + } else { + whenOther.apply(originalValue, field, updated, null, this); + } + } + } + if (!found) { + whenNotFound.apply( + originalValue, + null, + updated, + updateSchema.field(steps.get(step)), + this); + } + return updated; + } + + /** + * Prepares a new schema based on an original one, and applies an update function + * when the current path(s) is found. + * + * <p>If path is not found, no function is applied, and the path is ignored. + * + * <p>Other fields are copied from original schema. + * + * @param originalSchema baseline schema + * @param baselineSchemaBuilder baseline schema build, if changes to the baseline + * are required before copying original + * @param whenFound function to apply when current path(s) is/are found. + * @return an updated schema. Resulting schemas are usually cached for further access. + */ + public Schema updateSchemaFrom( + Schema originalSchema, + SchemaBuilder baselineSchemaBuilder, + StructSchemaUpdater whenFound, + StructSchemaUpdater whenNotFound, + StructSchemaUpdater whenOther + ) { + return updateSchema(originalSchema, baselineSchemaBuilder, 0, whenFound, whenNotFound, whenOther); + } + + // Recursive implementation to update schema at different steps. + // Consider that resulting schemas are usually cached. + private Schema updateSchema( + Schema operatingSchema, + SchemaBuilder builder, + int step, + StructSchemaUpdater matching, + StructSchemaUpdater notFound, + StructSchemaUpdater others + ) { + if (operatingSchema.isOptional()) { + builder.optional(); + } + if (operatingSchema.defaultValue() != null) { + builder.defaultValue(operatingSchema.defaultValue()); + } Review Comment: Good catch. I'm removing this for the current PR -- we can evaluate if this is needed in following PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org