Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-06-06 Thread via GitHub


C0urante commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1629805844


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -202,6 +204,225 @@ public Object valueFrom(Map map) {
 return current.get(lastStep());
 }
 
+/**
+ * Access {@code Map} fields and apply functions to update field values.
+ *
+ * @param originalValue schema-based data value
+ * @param whenFound function to apply when path is found
+ * @param whenNotFound  function to apply when path is not found
+ * @param whenOther function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Map updateValueFrom(
+Map originalValue,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+}
+
+@SuppressWarnings("unchecked")
+private Map updateValue(
+Map originalValue,
+int step,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+if (originalValue == null) return null;
+Map updatedParent = new 
HashMap<>(originalValue.size());
+boolean found = false;
+for (Map.Entry entry : originalValue.entrySet()) {
+String fieldName = entry.getKey();
+Object fieldValue = entry.getValue();
+if (steps.get(step).equals(fieldName)) {
+found = true;
+if (step < lastStepIndex()) {
+if (fieldValue instanceof Map) {
+Map updatedField = updateValue(
+(Map) fieldValue,
+step + 1,
+whenFound,
+whenNotFound,
+whenOther);
+updatedParent.put(fieldName, updatedField);
+} else {
+// add back to not found and apply others, as only 
leaf values are updated
+found = false;
+whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+}
+} else {
+whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+}
+} else {
+whenOther.apply(originalValue, updatedParent, null, fieldName);
+}
+}
+
+if (!found) {
+whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+}
+
+return updatedParent;
+}
+
+/**
+ * Access {@code Struct} fields and apply functions to update field values.
+ *
+ * @param originalSchema original struct schema
+ * @param originalValue  schema-based data value
+ * @param updatedSchema  updated struct schema
+ * @param whenFound  function to apply when path is found
+ * @param whenNotFound   function to apply when path is not found
+ * @param whenOther  function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Struct updateValueFrom(
+Schema originalSchema,
+Struct originalValue,
+Schema updatedSchema,
+StructValueUpdater whenFound,
+StructValueUpdater whenNotFound,
+StructValueUpdater whenOther
+) {
+return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+}
+
+private Struct updateValue(
+Schema originalSchema,
+Struct originalValue,
+Schema updateSchema,
+int step,
+StructValueUpdater whenFound,
+StructValueUpdater whenNotFound,
+StructValueUpdater whenOther
+) {
+Struct updated = new Struct(updateSchema);
+boolean found = false;
+for (Field field : originalSchema.fields()) {
+if (step < steps.size()) {
+if (steps.get(step).equals(field.name())) {
+found = true;
+if (step == lastStepIndex()) {
+whenFound.apply(
+originalValue,
+field,
+updated,
+updateSchema.field(field.name()),
+this
+);
+} else {
+if (field.schema().type() == Schema.Type.STRUCT) {
+Struct fieldValue = updateValue(
+field.schema(),
+originalValue.getStruct(field.name()),
+updateSchema.field(field.name()).schema(),
+step + 1,
+

Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-23 Thread via GitHub


jeqo commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1612459168


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -202,6 +204,225 @@ public Object valueFrom(Map map) {
 return current.get(lastStep());
 }
 
+/**
+ * Access {@code Map} fields and apply functions to update field values.
+ *
+ * @param originalValue schema-based data value
+ * @param whenFound function to apply when path is found
+ * @param whenNotFound  function to apply when path is not found
+ * @param whenOther function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Map updateValueFrom(
+Map originalValue,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+}
+
+@SuppressWarnings("unchecked")
+private Map updateValue(
+Map originalValue,
+int step,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+if (originalValue == null) return null;
+Map updatedParent = new 
HashMap<>(originalValue.size());
+boolean found = false;
+for (Map.Entry entry : originalValue.entrySet()) {
+String fieldName = entry.getKey();
+Object fieldValue = entry.getValue();
+if (steps.get(step).equals(fieldName)) {
+found = true;
+if (step < lastStepIndex()) {
+if (fieldValue instanceof Map) {
+Map updatedField = updateValue(
+(Map) fieldValue,
+step + 1,
+whenFound,
+whenNotFound,
+whenOther);
+updatedParent.put(fieldName, updatedField);
+} else {
+// add back to not found and apply others, as only 
leaf values are updated
+found = false;
+whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+}
+} else {
+whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+}
+} else {
+whenOther.apply(originalValue, updatedParent, null, fieldName);
+}
+}
+
+if (!found) {
+whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+}
+
+return updatedParent;
+}
+
+/**
+ * Access {@code Struct} fields and apply functions to update field values.
+ *
+ * @param originalSchema original struct schema
+ * @param originalValue  schema-based data value
+ * @param updatedSchema  updated struct schema
+ * @param whenFound  function to apply when path is found
+ * @param whenNotFound   function to apply when path is not found
+ * @param whenOther  function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Struct updateValueFrom(
+Schema originalSchema,
+Struct originalValue,
+Schema updatedSchema,
+StructValueUpdater whenFound,
+StructValueUpdater whenNotFound,
+StructValueUpdater whenOther
+) {
+return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+}
+
+private Struct updateValue(
+Schema originalSchema,
+Struct originalValue,
+Schema updateSchema,
+int step,
+StructValueUpdater whenFound,
+StructValueUpdater whenNotFound,
+StructValueUpdater whenOther
+) {
+Struct updated = new Struct(updateSchema);
+boolean found = false;
+for (Field field : originalSchema.fields()) {
+if (step < steps.size()) {
+if (steps.get(step).equals(field.name())) {
+found = true;
+if (step == lastStepIndex()) {
+whenFound.apply(
+originalValue,
+field,
+updated,
+updateSchema.field(field.name()),
+this
+);
+} else {
+if (field.schema().type() == Schema.Type.STRUCT) {
+Struct fieldValue = updateValue(
+field.schema(),
+originalValue.getStruct(field.name()),
+updateSchema.field(field.name()).schema(),
+step + 1,
+

Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-23 Thread via GitHub


jeqo commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1612458345


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -202,6 +204,225 @@ public Object valueFrom(Map map) {
 return current.get(lastStep());
 }
 
+/**
+ * Access {@code Map} fields and apply functions to update field values.
+ *
+ * @param originalValue schema-based data value
+ * @param whenFound function to apply when path is found
+ * @param whenNotFound  function to apply when path is not found
+ * @param whenOther function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Map updateValueFrom(
+Map originalValue,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+}
+
+@SuppressWarnings("unchecked")
+private Map updateValue(
+Map originalValue,
+int step,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+if (originalValue == null) return null;
+Map updatedParent = new 
HashMap<>(originalValue.size());
+boolean found = false;
+for (Map.Entry entry : originalValue.entrySet()) {
+String fieldName = entry.getKey();
+Object fieldValue = entry.getValue();
+if (steps.get(step).equals(fieldName)) {
+found = true;
+if (step < lastStepIndex()) {
+if (fieldValue instanceof Map) {
+Map updatedField = updateValue(
+(Map) fieldValue,
+step + 1,
+whenFound,
+whenNotFound,
+whenOther);
+updatedParent.put(fieldName, updatedField);
+} else {
+// add back to not found and apply others, as only 
leaf values are updated
+found = false;
+whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+}
+} else {
+whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+}
+} else {
+whenOther.apply(originalValue, updatedParent, null, fieldName);
+}
+}
+
+if (!found) {
+whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+}
+
+return updatedParent;
+}

Review Comment:
   Thanks! I like the proposed approach. The only change I'm considering to add 
is to return an intact map if value is not found (the proposal is to return 
null).
   This should make the behavior consistent with update Struct and Schema that 
do not make this distinction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-13 Thread via GitHub


C0urante commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1598401120


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -202,6 +204,225 @@ public Object valueFrom(Map map) {
 return current.get(lastStep());
 }
 
+/**
+ * Access {@code Map} fields and apply functions to update field values.
+ *
+ * @param originalValue schema-based data value
+ * @param whenFound function to apply when path is found
+ * @param whenNotFound  function to apply when path is not found
+ * @param whenOther function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Map updateValueFrom(
+Map originalValue,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+}
+
+@SuppressWarnings("unchecked")
+private Map updateValue(
+Map originalValue,
+int step,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+if (originalValue == null) return null;
+Map updatedParent = new 
HashMap<>(originalValue.size());
+boolean found = false;
+for (Map.Entry entry : originalValue.entrySet()) {
+String fieldName = entry.getKey();
+Object fieldValue = entry.getValue();
+if (steps.get(step).equals(fieldName)) {
+found = true;
+if (step < lastStepIndex()) {
+if (fieldValue instanceof Map) {
+Map updatedField = updateValue(
+(Map) fieldValue,
+step + 1,
+whenFound,
+whenNotFound,
+whenOther);
+updatedParent.put(fieldName, updatedField);
+} else {
+// add back to not found and apply others, as only 
leaf values are updated
+found = false;
+whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+}
+} else {
+whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+}
+} else {
+whenOther.apply(originalValue, updatedParent, null, fieldName);
+}
+}
+
+if (!found) {
+whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+}
+
+return updatedParent;
+}
+
+/**
+ * Access {@code Struct} fields and apply functions to update field values.
+ *
+ * @param originalSchema original struct schema
+ * @param originalValue  schema-based data value
+ * @param updatedSchema  updated struct schema
+ * @param whenFound  function to apply when path is found
+ * @param whenNotFound   function to apply when path is not found
+ * @param whenOther  function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Struct updateValueFrom(
+Schema originalSchema,
+Struct originalValue,
+Schema updatedSchema,
+StructValueUpdater whenFound,
+StructValueUpdater whenNotFound,
+StructValueUpdater whenOther
+) {
+return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+}
+
+private Struct updateValue(
+Schema originalSchema,
+Struct originalValue,
+Schema updateSchema,
+int step,
+StructValueUpdater whenFound,
+StructValueUpdater whenNotFound,
+StructValueUpdater whenOther
+) {
+Struct updated = new Struct(updateSchema);
+boolean found = false;
+for (Field field : originalSchema.fields()) {
+if (step < steps.size()) {
+if (steps.get(step).equals(field.name())) {
+found = true;
+if (step == lastStepIndex()) {
+whenFound.apply(
+originalValue,
+field,
+updated,
+updateSchema.field(field.name()),
+this
+);
+} else {
+if (field.schema().type() == Schema.Type.STRUCT) {
+Struct fieldValue = updateValue(
+field.schema(),
+originalValue.getStruct(field.name()),
+updateSchema.field(field.name()).schema(),
+step + 1,
+

Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-10 Thread via GitHub


jeqo commented on PR #15893:
URL: https://github.com/apache/kafka/pull/15893#issuecomment-2104551065

   @C0urante thanks! Agree, we should leave this change of behavior out of the 
scope of this PR/KIP. I have returned to the previous behavior, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-09 Thread via GitHub


C0urante commented on PR #15893:
URL: https://github.com/apache/kafka/pull/15893#issuecomment-2102926693

   I think this leads to a change in behavior. Right now this test 
(surprisingly!) passes on trunk:
   
   ```java
   public class TimestampConverterTest {
   // ...
   
   @Test
   public void testWithSchemaFieldWithDefaultValue() {
   Map config = new HashMap<>();
   config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
   config.put(TimestampConverter.FIELD_CONFIG, "timestamp_field");
   xformValue.configure(config);
   
   java.util.Date defaultFieldValue = new java.util.Date();
   Schema schema = SchemaBuilder.struct()
   .field(
   "timestamp_field",
   Timestamp.builder()
   .defaultValue(defaultFieldValue)
   .build()
   );
   Struct value = new Struct(schema)
   .put("timestamp_field", DATE_PLUS_TIME.getTime());
   
   SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(schema, value));
   
   assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
   Struct transformedValue = (Struct) transformed.value();
   assertEquals(DATE_PLUS_TIME.getTime(), 
transformedValue.get("timestamp_field"));
   
assertNull(transformedValue.schema().field("timestamp_field").schema().defaultValue());
   }
   }
   ```
   
   We don't propagate default values for the fields we convert. Instead, we 
automatically substitute in the default value if none is found. This is 
surprising behavior, and has led to things like 
[KIP-581](https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value)
 and 
[KIP-1040](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677),
 but I don't think we should change it in this KIP because it's out of scope 
and, if necessary, can be touched on in KIP-1040 (which is in discussion at the 
moment).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org