Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
github-actions[bot] commented on PR #15893: URL: https://github.com/apache/kafka/pull/15893#issuecomment-2330528656 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
C0urante commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1629805844 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -202,6 +204,225 @@ public Object valueFrom(Map map) { return current.get(lastStep()); } +/** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Map updateValueFrom( +Map originalValue, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); +} + +@SuppressWarnings("unchecked") +private Map updateValue( +Map originalValue, +int step, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +if (originalValue == null) return null; +Map updatedParent = new HashMap<>(originalValue.size()); +boolean found = false; +for (Map.Entry entry : originalValue.entrySet()) { +String fieldName = entry.getKey(); +Object fieldValue = entry.getValue(); +if (steps.get(step).equals(fieldName)) { +found = true; +if (step < lastStepIndex()) { +if (fieldValue instanceof Map) { +Map updatedField = updateValue( +(Map) fieldValue, +step + 1, +whenFound, +whenNotFound, +whenOther); +updatedParent.put(fieldName, updatedField); +} else { +// add back to not found and apply others, as only leaf values are updated +found = false; +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} else { +whenFound.apply(originalValue, updatedParent, this, fieldName); +} +} else { +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} + +if (!found) { +whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); +} + +return updatedParent; +} + +/** + * Access {@code Struct} fields and apply functions to update field values. + * + * @param originalSchema original struct schema + * @param originalValue schema-based data value + * @param updatedSchema updated struct schema + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Struct updateValueFrom( +Schema originalSchema, +Struct originalValue, +Schema updatedSchema, +StructValueUpdater whenFound, +StructValueUpdater whenNotFound, +StructValueUpdater whenOther +) { +return updateValue(originalSchema, originalValue, updatedSchema, 0, whenFound, whenNotFound, whenOther); +} + +private Struct updateValue( +Schema originalSchema, +Struct originalValue, +Schema updateSchema, +int step, +StructValueUpdater whenFound, +StructValueUpdater whenNotFound, +StructValueUpdater whenOther +) { +Struct updated = new Struct(updateSchema); +boolean found = false; +for (Field field : originalSchema.fields()) { +if (step < steps.size()) { +if (steps.get(step).equals(field.name())) { +found = true; +if (step == lastStepIndex()) { +whenFound.apply( +originalValue, +field, +updated, +updateSchema.field(field.name()), +this +); +} else { +if (field.schema().type() == Schema.Type.STRUCT) { +Struct fieldValue = updateValue( +field.schema(), +originalValue.getStruct(field.name()), +updateSchema.field(field.name()).schema(), +step + 1, +
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
jeqo commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1612459168 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -202,6 +204,225 @@ public Object valueFrom(Map map) { return current.get(lastStep()); } +/** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Map updateValueFrom( +Map originalValue, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); +} + +@SuppressWarnings("unchecked") +private Map updateValue( +Map originalValue, +int step, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +if (originalValue == null) return null; +Map updatedParent = new HashMap<>(originalValue.size()); +boolean found = false; +for (Map.Entry entry : originalValue.entrySet()) { +String fieldName = entry.getKey(); +Object fieldValue = entry.getValue(); +if (steps.get(step).equals(fieldName)) { +found = true; +if (step < lastStepIndex()) { +if (fieldValue instanceof Map) { +Map updatedField = updateValue( +(Map) fieldValue, +step + 1, +whenFound, +whenNotFound, +whenOther); +updatedParent.put(fieldName, updatedField); +} else { +// add back to not found and apply others, as only leaf values are updated +found = false; +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} else { +whenFound.apply(originalValue, updatedParent, this, fieldName); +} +} else { +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} + +if (!found) { +whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); +} + +return updatedParent; +} + +/** + * Access {@code Struct} fields and apply functions to update field values. + * + * @param originalSchema original struct schema + * @param originalValue schema-based data value + * @param updatedSchema updated struct schema + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Struct updateValueFrom( +Schema originalSchema, +Struct originalValue, +Schema updatedSchema, +StructValueUpdater whenFound, +StructValueUpdater whenNotFound, +StructValueUpdater whenOther +) { +return updateValue(originalSchema, originalValue, updatedSchema, 0, whenFound, whenNotFound, whenOther); +} + +private Struct updateValue( +Schema originalSchema, +Struct originalValue, +Schema updateSchema, +int step, +StructValueUpdater whenFound, +StructValueUpdater whenNotFound, +StructValueUpdater whenOther +) { +Struct updated = new Struct(updateSchema); +boolean found = false; +for (Field field : originalSchema.fields()) { +if (step < steps.size()) { +if (steps.get(step).equals(field.name())) { +found = true; +if (step == lastStepIndex()) { +whenFound.apply( +originalValue, +field, +updated, +updateSchema.field(field.name()), +this +); +} else { +if (field.schema().type() == Schema.Type.STRUCT) { +Struct fieldValue = updateValue( +field.schema(), +originalValue.getStruct(field.name()), +updateSchema.field(field.name()).schema(), +step + 1, +
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
jeqo commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1612458345 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -202,6 +204,225 @@ public Object valueFrom(Map map) { return current.get(lastStep()); } +/** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Map updateValueFrom( +Map originalValue, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); +} + +@SuppressWarnings("unchecked") +private Map updateValue( +Map originalValue, +int step, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +if (originalValue == null) return null; +Map updatedParent = new HashMap<>(originalValue.size()); +boolean found = false; +for (Map.Entry entry : originalValue.entrySet()) { +String fieldName = entry.getKey(); +Object fieldValue = entry.getValue(); +if (steps.get(step).equals(fieldName)) { +found = true; +if (step < lastStepIndex()) { +if (fieldValue instanceof Map) { +Map updatedField = updateValue( +(Map) fieldValue, +step + 1, +whenFound, +whenNotFound, +whenOther); +updatedParent.put(fieldName, updatedField); +} else { +// add back to not found and apply others, as only leaf values are updated +found = false; +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} else { +whenFound.apply(originalValue, updatedParent, this, fieldName); +} +} else { +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} + +if (!found) { +whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); +} + +return updatedParent; +} Review Comment: Thanks! I like the proposed approach. The only change I'm considering to add is to return an intact map if value is not found (the proposal is to return null). This should make the behavior consistent with update Struct and Schema that do not make this distinction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
C0urante commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1598401120 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -202,6 +204,225 @@ public Object valueFrom(Map map) { return current.get(lastStep()); } +/** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Map updateValueFrom( +Map originalValue, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); +} + +@SuppressWarnings("unchecked") +private Map updateValue( +Map originalValue, +int step, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +if (originalValue == null) return null; +Map updatedParent = new HashMap<>(originalValue.size()); +boolean found = false; +for (Map.Entry entry : originalValue.entrySet()) { +String fieldName = entry.getKey(); +Object fieldValue = entry.getValue(); +if (steps.get(step).equals(fieldName)) { +found = true; +if (step < lastStepIndex()) { +if (fieldValue instanceof Map) { +Map updatedField = updateValue( +(Map) fieldValue, +step + 1, +whenFound, +whenNotFound, +whenOther); +updatedParent.put(fieldName, updatedField); +} else { +// add back to not found and apply others, as only leaf values are updated +found = false; +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} else { +whenFound.apply(originalValue, updatedParent, this, fieldName); +} +} else { +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} + +if (!found) { +whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); +} + +return updatedParent; +} + +/** + * Access {@code Struct} fields and apply functions to update field values. + * + * @param originalSchema original struct schema + * @param originalValue schema-based data value + * @param updatedSchema updated struct schema + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Struct updateValueFrom( +Schema originalSchema, +Struct originalValue, +Schema updatedSchema, +StructValueUpdater whenFound, +StructValueUpdater whenNotFound, +StructValueUpdater whenOther +) { +return updateValue(originalSchema, originalValue, updatedSchema, 0, whenFound, whenNotFound, whenOther); +} + +private Struct updateValue( +Schema originalSchema, +Struct originalValue, +Schema updateSchema, +int step, +StructValueUpdater whenFound, +StructValueUpdater whenNotFound, +StructValueUpdater whenOther +) { +Struct updated = new Struct(updateSchema); +boolean found = false; +for (Field field : originalSchema.fields()) { +if (step < steps.size()) { +if (steps.get(step).equals(field.name())) { +found = true; +if (step == lastStepIndex()) { +whenFound.apply( +originalValue, +field, +updated, +updateSchema.field(field.name()), +this +); +} else { +if (field.schema().type() == Schema.Type.STRUCT) { +Struct fieldValue = updateValue( +field.schema(), +originalValue.getStruct(field.name()), +updateSchema.field(field.name()).schema(), +step + 1, +
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
jeqo commented on PR #15893: URL: https://github.com/apache/kafka/pull/15893#issuecomment-2104551065 @C0urante thanks! Agree, we should leave this change of behavior out of the scope of this PR/KIP. I have returned to the previous behavior, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
C0urante commented on PR #15893: URL: https://github.com/apache/kafka/pull/15893#issuecomment-2102926693 I think this leads to a change in behavior. Right now this test (surprisingly!) passes on trunk: ```java public class TimestampConverterTest { // ... @Test public void testWithSchemaFieldWithDefaultValue() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "timestamp_field"); xformValue.configure(config); java.util.Date defaultFieldValue = new java.util.Date(); Schema schema = SchemaBuilder.struct() .field( "timestamp_field", Timestamp.builder() .defaultValue(defaultFieldValue) .build() ); Struct value = new Struct(schema) .put("timestamp_field", DATE_PLUS_TIME.getTime()); SourceRecord transformed = xformValue.apply(createRecordWithSchema(schema, value)); assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); Struct transformedValue = (Struct) transformed.value(); assertEquals(DATE_PLUS_TIME.getTime(), transformedValue.get("timestamp_field")); assertNull(transformedValue.schema().field("timestamp_field").schema().defaultValue()); } } ``` We don't propagate default values for the fields we convert. Instead, we automatically substitute in the default value if none is found. This is surprising behavior, and has led to things like [KIP-581](https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value) and [KIP-1040](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677), but I don't think we should change it in this KIP because it's out of scope and, if necessary, can be touched on in KIP-1040 (which is in discussion at the moment). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org