C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1583252760
########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}). + * + * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a> + * @see FieldSyntaxVersion + */ +public class SingleFieldPath { + // Invariants: + // - A field path can contain one or more steps + private static final char BACKTICK = '`'; + private static final char DOT = '.'; + private static final char BACKSLASH = '\\'; + + private final FieldSyntaxVersion version; + private final List<String> steps; + + public SingleFieldPath(String pathText, FieldSyntaxVersion version) { + this.version = version; + switch (version) { + case V1: // backward compatibility + this.steps = Collections.singletonList(pathText); + break; + case V2: + this.steps = buildFieldPathV2(pathText); + break; + default: + throw new IllegalArgumentException("Unknown syntax version: " + version); + } + } + + private static List<String> buildFieldPathV2(String path) { + final List<String> steps = new ArrayList<>(); + // path character index to track backticks and dots and break path into steps + int idx = 0; + while (idx < path.length() && idx >= 0) { + if (path.charAt(idx) != BACKTICK) { + final int start = idx; + idx = path.indexOf(String.valueOf(DOT), idx); + if (idx >= 0) { // get path step and move forward + String field = path.substring(start, idx); + steps.add(field); + idx++; + } else { // add all + String field = path.substring(start); + steps.add(field); + } + } else { // has backtick + int backtickAt = idx; + idx++; + StringBuilder field = new StringBuilder(); + int start = idx; + while (true) { + // find closing backtick + idx = path.indexOf(String.valueOf(BACKTICK), idx); + if (idx == -1) { // if not found, then fail + failWhenIncompleteBacktickPair(path, backtickAt); + } + + // backtick escaped if right after backslash + boolean escaped = path.charAt(idx - 1) == BACKSLASH; + + if (idx >= path.length() - 1) { // at the end of path + if (escaped) { // but escaped, then fail + failWhenIncompleteBacktickPair(path, backtickAt); + } + field.append(path, start, idx); + // we've reached the end of the path, and the last character is the backtick + steps.add(field.toString()); + idx++; + break; + } + + if (path.charAt(idx + 1) != DOT) { // not followed by a dot + // this backtick isn't followed by a dot; include it in the field name, but continue + // looking for a matching backtick that is followed by a dot + idx++; + continue; + } + + if (escaped) { + // this backtick was escaped; include it in the field name, but continue + // looking for an unescaped matching backtick + field.append(path, start, idx - 1) + .append(BACKTICK); + + idx++; + start = idx; + continue; + } + + // we've found our matching backtick + field.append(path, start, idx); + steps.add(field.toString()); + idx += 2; // increment by two to include the backtick and the dot after it + break; + } + } + } + // add last step if last char is a dot + if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT) + steps.add(""); + return Collections.unmodifiableList(steps); + } + + private static void failWhenIncompleteBacktickPair(String path, int backtickAt) { + throw new ConfigException("Incomplete backtick pair in path: [" + path + "]," + + " consider adding a backslash before backtick at position " + backtickAt + + " to escape it"); + } + + /** + * Access a {@code Field} at the current path within a schema {@code Schema} + * If field is not found, then {@code null} is returned. + */ + public Field fieldFrom(Schema schema) { + if (schema == null) return null; + + Schema current = schema; + for (String pathSegment : stepsWithoutLast()) { + final Field field = current.field(pathSegment); + if (field != null) { + current = field.schema(); + } else { + return null; + } + } + return current.field(lastStep()); + } + + /** + * Access a value at the current path within a schema-based {@code Struct} + * If object is not found, then {@code null} is returned. + */ + public Object valueFrom(Struct struct) { + if (struct == null) return null; + + Struct current = struct; + for (String pathSegment : stepsWithoutLast()) { + // Check to see if the field actually exists + if (current.schema().field(pathSegment) == null) { + return null; + } + current = current.getStruct(pathSegment); Review Comment: Nit: we could have slightly friendlier error messages here: ```suggestion Object subValue = current.get(pathSegment); current = requireStructOrNull(subValue, "nested field access"); ``` ########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}). + * + * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a> + * @see FieldSyntaxVersion + */ +public class SingleFieldPath { + // Invariants: + // - A field path can contain one or more steps + private static final char BACKTICK = '`'; + private static final char DOT = '.'; + private static final char BACKSLASH = '\\'; + + private final FieldSyntaxVersion version; + private final List<String> steps; + + public SingleFieldPath(String pathText, FieldSyntaxVersion version) { + this.version = version; + switch (version) { + case V1: // backward compatibility + this.steps = Collections.singletonList(pathText); + break; + case V2: + this.steps = buildFieldPathV2(pathText); + break; + default: + throw new IllegalArgumentException("Unknown syntax version: " + version); + } + } + + private static List<String> buildFieldPathV2(String path) { + final List<String> steps = new ArrayList<>(); + // path character index to track backticks and dots and break path into steps + int idx = 0; + while (idx < path.length() && idx >= 0) { + if (path.charAt(idx) != BACKTICK) { + final int start = idx; + idx = path.indexOf(String.valueOf(DOT), idx); + if (idx >= 0) { // get path step and move forward + String field = path.substring(start, idx); + steps.add(field); + idx++; + } else { // add all + String field = path.substring(start); + steps.add(field); + } + } else { // has backtick + int backtickAt = idx; + idx++; + StringBuilder field = new StringBuilder(); + int start = idx; + while (true) { + // find closing backtick + idx = path.indexOf(String.valueOf(BACKTICK), idx); + if (idx == -1) { // if not found, then fail + failWhenIncompleteBacktickPair(path, backtickAt); + } + + // backtick escaped if right after backslash + boolean escaped = path.charAt(idx - 1) == BACKSLASH; + + if (idx >= path.length() - 1) { // at the end of path + if (escaped) { // but escaped, then fail + failWhenIncompleteBacktickPair(path, backtickAt); + } + field.append(path, start, idx); + // we've reached the end of the path, and the last character is the backtick + steps.add(field.toString()); + idx++; + break; + } + + if (path.charAt(idx + 1) != DOT) { // not followed by a dot + // this backtick isn't followed by a dot; include it in the field name, but continue + // looking for a matching backtick that is followed by a dot + idx++; + continue; + } + + if (escaped) { + // this backtick was escaped; include it in the field name, but continue + // looking for an unescaped matching backtick + field.append(path, start, idx - 1) + .append(BACKTICK); + + idx++; + start = idx; + continue; + } + + // we've found our matching backtick + field.append(path, start, idx); + steps.add(field.toString()); + idx += 2; // increment by two to include the backtick and the dot after it + break; + } + } + } + // add last step if last char is a dot + if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT) + steps.add(""); + return Collections.unmodifiableList(steps); + } + + private static void failWhenIncompleteBacktickPair(String path, int backtickAt) { + throw new ConfigException("Incomplete backtick pair in path: [" + path + "]," + + " consider adding a backslash before backtick at position " + backtickAt + + " to escape it"); + } + + /** + * Access a {@code Field} at the current path within a schema {@code Schema} + * If field is not found, then {@code null} is returned. + */ + public Field fieldFrom(Schema schema) { + if (schema == null) return null; + + Schema current = schema; + for (String pathSegment : stepsWithoutLast()) { + final Field field = current.field(pathSegment); + if (field != null) { + current = field.schema(); + } else { + return null; + } + } + return current.field(lastStep()); + } + + /** + * Access a value at the current path within a schema-based {@code Struct} + * If object is not found, then {@code null} is returned. + */ + public Object valueFrom(Struct struct) { + if (struct == null) return null; + + Struct current = struct; + for (String pathSegment : stepsWithoutLast()) { + // Check to see if the field actually exists + if (current.schema().field(pathSegment) == null) { + return null; + } + current = current.getStruct(pathSegment); + if (current == null) return null; + } + + if (current.schema().field(lastStep()) != null) { + return current.get(lastStep()); + } else { + return null; + } + } + + List<String> stepsWithoutLast() { Review Comment: Nit: can be private ```suggestion private List<String> stepsWithoutLast() { ``` ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +class SingleFieldPathTest { + + @Test void shouldFindField() { + SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); Review Comment: (This applies in several other places in this suite; I've only left one comment to avoid clutter.) ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +class SingleFieldPathTest { + + @Test void shouldFindField() { + SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); + Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + + assertEquals(barSchema.field("bar"), pathV2("foo.bar").fieldFrom(schema)); + assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema)); + } + + @Test void shouldReturnNullFieldWhenFieldNotFound() { + SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); + Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + + assertNull(pathV2("un.known").fieldFrom(schema)); + assertNull(pathV2("foo.unknown").fieldFrom(schema)); + assertNull(pathV2("unknown").fieldFrom(schema)); + assertNull(pathV2("test").fieldFrom(null)); + } + + @Test void shouldFindValueInMap() { + Map<String, Object> foo = new HashMap<>(); + foo.put("bar", 42); + foo.put("baz", null); + Map<String, Object> map = new HashMap<>(); + map.put("foo", foo); + + assertEquals(42, pathV2("foo.bar").valueFrom(map)); + assertNull(pathV2("foo.baz").valueFrom(map)); + } + + @Test void shouldReturnNullValueWhenFieldNotFoundInMap() { + Map<String, Object> foo = new HashMap<>(); + foo.put("bar", 42); + foo.put("baz", null); + Map<String, Object> map = new HashMap<>(); + map.put("foo", foo); + + assertNull(new SingleFieldPath("un.known", FieldSyntaxVersion.V2).valueFrom(map)); Review Comment: Don't we want to use `pathV2` (or possibly `pathV2Value`, proposed below) instead of manually instantiating the `SingleFieldPath`? (This applies to other places in the test suite as well.) ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java: ########## @@ -111,7 +157,24 @@ public void nonExistentFieldWithSchemaShouldFail() { xform.apply(record); fail("Expected exception wasn't raised"); } catch (IllegalArgumentException iae) { - assertEquals("Unknown field: nonexistent", iae.getMessage()); + assertEquals("Unknown field: SingleFieldPath{version=V1, path=nonexistent}", iae.getMessage()); + } + } + + @Test + public void nonExistentNestedFieldWithSchemaShouldFail() { Review Comment: Don't we need to configure the transform to use field syntax V2? ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java: ########## @@ -111,7 +157,24 @@ public void nonExistentFieldWithSchemaShouldFail() { xform.apply(record); fail("Expected exception wasn't raised"); } catch (IllegalArgumentException iae) { - assertEquals("Unknown field: nonexistent", iae.getMessage()); + assertEquals("Unknown field: SingleFieldPath{version=V1, path=nonexistent}", iae.getMessage()); Review Comment: This error message is less human-readable. Could we preserve the existing one? I don't think the syntax version is necessary; we can just use the path as specified by the user in the transform config. ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +class SingleFieldPathTest { + + @Test void shouldFindField() { + SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); Review Comment: Better practice if we operate on fully-built `Schema` instances instead of `SchemaBuilder` objects: ```suggestion Schema barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA).build(); ``` ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +class SingleFieldPathTest { + + @Test void shouldFindField() { + SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); + Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + + assertEquals(barSchema.field("bar"), pathV2("foo.bar").fieldFrom(schema)); + assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema)); + } + + @Test void shouldReturnNullFieldWhenFieldNotFound() { + SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); + Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + + assertNull(pathV2("un.known").fieldFrom(schema)); + assertNull(pathV2("foo.unknown").fieldFrom(schema)); + assertNull(pathV2("unknown").fieldFrom(schema)); + assertNull(pathV2("test").fieldFrom(null)); + } + + @Test void shouldFindValueInMap() { + Map<String, Object> foo = new HashMap<>(); + foo.put("bar", 42); + foo.put("baz", null); + Map<String, Object> map = new HashMap<>(); + map.put("foo", foo); + + assertEquals(42, pathV2("foo.bar").valueFrom(map)); + assertNull(pathV2("foo.baz").valueFrom(map)); + } + + @Test void shouldReturnNullValueWhenFieldNotFoundInMap() { + Map<String, Object> foo = new HashMap<>(); + foo.put("bar", 42); + foo.put("baz", null); + Map<String, Object> map = new HashMap<>(); + map.put("foo", foo); + + assertNull(new SingleFieldPath("un.known", FieldSyntaxVersion.V2).valueFrom(map)); + assertNull(new SingleFieldPath("foo.unknown", FieldSyntaxVersion.V2).valueFrom(map)); + assertNull(new SingleFieldPath("unknown", FieldSyntaxVersion.V2).valueFrom(map)); + assertNull(new SingleFieldPath("foo.baz", FieldSyntaxVersion.V2).valueFrom(map)); + assertNull(new SingleFieldPath("foo.baz.inner", FieldSyntaxVersion.V2).valueFrom(map)); + } + + @Test void shouldFindValueInStruct() { + SchemaBuilder bazSchema = SchemaBuilder.struct() + .field("inner", Schema.STRING_SCHEMA); + SchemaBuilder barSchema = SchemaBuilder.struct() + .field("bar", Schema.INT32_SCHEMA) + .field("baz", bazSchema.optional()); + Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + Struct foo = new Struct(barSchema) + .put("bar", 42) + .put("baz", null); + Struct struct = new Struct(schema).put("foo", foo); + + assertEquals(42, pathV2("foo.bar").valueFrom(struct)); + assertNull(pathV2("foo.baz").valueFrom(struct)); + } + + @Test void shouldReturnNullValueWhenFieldNotFoundInStruct() { + SchemaBuilder bazSchema = SchemaBuilder.struct() + .field("inner", Schema.STRING_SCHEMA); + SchemaBuilder barSchema = SchemaBuilder.struct() + .field("bar", Schema.INT32_SCHEMA) + .field("baz", bazSchema.optional()); + Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + Struct foo = new Struct(barSchema) + .put("bar", 42) + .put("baz", null); + Struct struct = new Struct(schema).put("foo", foo); + + assertNull(new SingleFieldPath("un.known", FieldSyntaxVersion.V2).valueFrom(struct)); + assertNull(new SingleFieldPath("foo.unknown", FieldSyntaxVersion.V2).valueFrom(struct)); + assertNull(new SingleFieldPath("unknown", FieldSyntaxVersion.V2).valueFrom(struct)); + assertNull(new SingleFieldPath("foo.baz", FieldSyntaxVersion.V2).valueFrom(struct)); + assertNull(new SingleFieldPath("foo.baz.inner", FieldSyntaxVersion.V2).valueFrom(struct)); + } + + private static SingleFieldPath pathV2(String path) { + return new SingleFieldPath(path, FieldSyntaxVersion.V2); + } Review Comment: We can simplify things even further by piggybacking off of this with a few extra utility methods: ```java private static Field pathV2Field(String path, Schema schema) { return pathV2(path).fieldFrom(schema); } private static Object pathV2Value(String path, Struct struct) { return pathV2(path).valueFrom(struct); } private static Object pathV2Value(String path, Map<String, Object> map) { return pathV2(path).valueFrom(map); } ``` I think we can replace all direct invocations of `pathV2` with these wrapper methods. ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java: ########## @@ -111,7 +157,24 @@ public void nonExistentFieldWithSchemaShouldFail() { xform.apply(record); fail("Expected exception wasn't raised"); } catch (IllegalArgumentException iae) { - assertEquals("Unknown field: nonexistent", iae.getMessage()); + assertEquals("Unknown field: SingleFieldPath{version=V1, path=nonexistent}", iae.getMessage()); + } + } + + @Test + public void nonExistentNestedFieldWithSchemaShouldFail() { + xform.configure(Collections.singletonMap("field", "magic.nonexistent")); + + final Schema fooSchema = SchemaBuilder.struct().field("foo", Schema.INT32_SCHEMA).build(); + final Schema keySchema = SchemaBuilder.struct().field("magic", fooSchema).build(); + final Struct key = new Struct(keySchema).put("magic", new Struct(fooSchema).put("foo", 42)); + final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0); + + try { + xform.apply(record); + fail("Expected exception wasn't raised"); + } catch (IllegalArgumentException iae) { + assertEquals("Unknown field: SingleFieldPath{version=V1, path=magic.nonexistent}", iae.getMessage()); Review Comment: Like with `nonExistentFieldWithSchemaShouldFail`, I don't think the field syntax version needs to be included in the error message. But if it is included, shouldn't it be V2 here instead of V1? ########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}). + * + * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a> + * @see FieldSyntaxVersion + */ +public class SingleFieldPath { + // Invariants: + // - A field path can contain one or more steps + private static final char BACKTICK = '`'; + private static final char DOT = '.'; + private static final char BACKSLASH = '\\'; + + private final FieldSyntaxVersion version; + private final List<String> steps; + + public SingleFieldPath(String pathText, FieldSyntaxVersion version) { + this.version = version; + switch (version) { + case V1: // backward compatibility + this.steps = Collections.singletonList(pathText); + break; + case V2: + this.steps = buildFieldPathV2(pathText); + break; + default: + throw new IllegalArgumentException("Unknown syntax version: " + version); + } + } + + private static List<String> buildFieldPathV2(String path) { + final List<String> steps = new ArrayList<>(); + // path character index to track backticks and dots and break path into steps + int idx = 0; + while (idx < path.length() && idx >= 0) { + if (path.charAt(idx) != BACKTICK) { + final int start = idx; + idx = path.indexOf(String.valueOf(DOT), idx); + if (idx >= 0) { // get path step and move forward + String field = path.substring(start, idx); + steps.add(field); + idx++; + } else { // add all + String field = path.substring(start); + steps.add(field); + } + } else { // has backtick + int backtickAt = idx; + idx++; + StringBuilder field = new StringBuilder(); + int start = idx; + while (true) { + // find closing backtick + idx = path.indexOf(String.valueOf(BACKTICK), idx); + if (idx == -1) { // if not found, then fail + failWhenIncompleteBacktickPair(path, backtickAt); + } + + // backtick escaped if right after backslash + boolean escaped = path.charAt(idx - 1) == BACKSLASH; + + if (idx >= path.length() - 1) { // at the end of path + if (escaped) { // but escaped, then fail + failWhenIncompleteBacktickPair(path, backtickAt); + } + field.append(path, start, idx); + // we've reached the end of the path, and the last character is the backtick + steps.add(field.toString()); + idx++; + break; + } + + if (path.charAt(idx + 1) != DOT) { // not followed by a dot + // this backtick isn't followed by a dot; include it in the field name, but continue + // looking for a matching backtick that is followed by a dot + idx++; + continue; + } + + if (escaped) { + // this backtick was escaped; include it in the field name, but continue + // looking for an unescaped matching backtick + field.append(path, start, idx - 1) + .append(BACKTICK); + + idx++; + start = idx; + continue; + } + + // we've found our matching backtick + field.append(path, start, idx); + steps.add(field.toString()); + idx += 2; // increment by two to include the backtick and the dot after it + break; + } + } + } + // add last step if last char is a dot + if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT) + steps.add(""); + return Collections.unmodifiableList(steps); + } + + private static void failWhenIncompleteBacktickPair(String path, int backtickAt) { + throw new ConfigException("Incomplete backtick pair in path: [" + path + "]," + + " consider adding a backslash before backtick at position " + backtickAt + + " to escape it"); + } + + /** + * Access a {@code Field} at the current path within a schema {@code Schema} + * If field is not found, then {@code null} is returned. + */ + public Field fieldFrom(Schema schema) { + if (schema == null) return null; + + Schema current = schema; + for (String pathSegment : stepsWithoutLast()) { + final Field field = current.field(pathSegment); + if (field != null) { + current = field.schema(); + } else { + return null; + } + } + return current.field(lastStep()); + } + + /** + * Access a value at the current path within a schema-based {@code Struct} + * If object is not found, then {@code null} is returned. + */ + public Object valueFrom(Struct struct) { + if (struct == null) return null; + + Struct current = struct; + for (String pathSegment : stepsWithoutLast()) { + // Check to see if the field actually exists + if (current.schema().field(pathSegment) == null) { + return null; + } + current = current.getStruct(pathSegment); + if (current == null) return null; + } + + if (current.schema().field(lastStep()) != null) { + return current.get(lastStep()); + } else { + return null; + } + } + + List<String> stepsWithoutLast() { + return steps.subList(0, lastStepIndex()); + } + + /** + * Access a value at the current path within a schemaless {@code Map<String, Object>}. + * If object is not found, then {@code null} is returned. + */ + public Object valueFrom(Map<String, Object> map) { + if (map == null) return null; + + Map<String, Object> current = map; + for (String step : stepsWithoutLast()) { + current = requireMapOrNull(current.get(step), "nested field access"); + if (current == null) return null; + } + return current.get(lastStep()); + } + + // For testing + String[] path() { + return steps.toArray(new String[0]); + } + + String lastStep() { Review Comment: Nit: can be private ```suggestion private String lastStep() { ``` ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java: ########## @@ -99,6 +133,18 @@ public void nonExistentFieldSchemalessShouldReturnNull() { assertNull(transformedRecord.key()); } + @Test + public void nonExistentNestedFieldSchemalessShouldReturnNull() { Review Comment: Don't we need to configure the transform to use field syntax V2? ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class FieldSyntaxVersionTest { + @Test + void shouldAppendConfigToDef() { + ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef()); + assertEquals(def.configKeys().size(), 1); + final ConfigDef.ConfigKey configKey = def.configKeys().get("field.syntax.version"); + assertEquals(configKey.name, "field.syntax.version"); + assertEquals(configKey.defaultValue, "V1"); + } + + @Test + void shouldFailWhenAppendConfigToDefAgain() { + ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef()); + assertEquals(def.configKeys().size(), 1); + ConfigException e = assertThrows(ConfigException.class, () -> FieldSyntaxVersion.appendConfigTo(def)); + assertEquals(e.getMessage(), "Configuration field.syntax.version is defined twice."); + } + + @ParameterizedTest + @CsvSource({"v1,V1", "v2,V2", "V1,V1", "V2,V2"}) + void shouldGetVersionFromConfig(String input, FieldSyntaxVersion version) { Review Comment: This is pretty slick. Nice! ########## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +class SingleFieldPathTest { + + @Test void shouldFindField() { + SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); + Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + + assertEquals(barSchema.field("bar"), pathV2("foo.bar").fieldFrom(schema)); + assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema)); + } + + @Test void shouldReturnNullFieldWhenFieldNotFound() { + SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA); + Schema schema = SchemaBuilder.struct().field("foo", barSchema).build(); + + assertNull(pathV2("un.known").fieldFrom(schema)); + assertNull(pathV2("foo.unknown").fieldFrom(schema)); + assertNull(pathV2("unknown").fieldFrom(schema)); + assertNull(pathV2("test").fieldFrom(null)); + } + + @Test void shouldFindValueInMap() { + Map<String, Object> foo = new HashMap<>(); + foo.put("bar", 42); + foo.put("baz", null); + Map<String, Object> map = new HashMap<>(); + map.put("foo", foo); + + assertEquals(42, pathV2("foo.bar").valueFrom(map)); + assertNull(pathV2("foo.baz").valueFrom(map)); + } + + @Test void shouldReturnNullValueWhenFieldNotFoundInMap() { + Map<String, Object> foo = new HashMap<>(); + foo.put("bar", 42); + foo.put("baz", null); + Map<String, Object> map = new HashMap<>(); + map.put("foo", foo); + + assertNull(new SingleFieldPath("un.known", FieldSyntaxVersion.V2).valueFrom(map)); + assertNull(new SingleFieldPath("foo.unknown", FieldSyntaxVersion.V2).valueFrom(map)); + assertNull(new SingleFieldPath("unknown", FieldSyntaxVersion.V2).valueFrom(map)); + assertNull(new SingleFieldPath("foo.baz", FieldSyntaxVersion.V2).valueFrom(map)); Review Comment: This assertion technically doesn't belong in this case, right? This value does appear in the map, it just happens to be explicitly null. It's also covered above in `shouldFindValueInMap`, so it should be safe to remove from here. (This same pattern applies in other places in the test suite too.) ########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}). + * + * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a> + * @see FieldSyntaxVersion + */ +public class SingleFieldPath { + // Invariants: + // - A field path can contain one or more steps + private static final char BACKTICK = '`'; + private static final char DOT = '.'; + private static final char BACKSLASH = '\\'; + + private final FieldSyntaxVersion version; + private final List<String> steps; + + public SingleFieldPath(String pathText, FieldSyntaxVersion version) { + this.version = version; + switch (version) { + case V1: // backward compatibility + this.steps = Collections.singletonList(pathText); + break; + case V2: + this.steps = buildFieldPathV2(pathText); + break; + default: + throw new IllegalArgumentException("Unknown syntax version: " + version); + } + } + + private static List<String> buildFieldPathV2(String path) { + final List<String> steps = new ArrayList<>(); + // path character index to track backticks and dots and break path into steps + int idx = 0; + while (idx < path.length() && idx >= 0) { + if (path.charAt(idx) != BACKTICK) { + final int start = idx; + idx = path.indexOf(String.valueOf(DOT), idx); + if (idx >= 0) { // get path step and move forward + String field = path.substring(start, idx); + steps.add(field); + idx++; + } else { // add all + String field = path.substring(start); + steps.add(field); + } + } else { // has backtick + int backtickAt = idx; + idx++; + StringBuilder field = new StringBuilder(); + int start = idx; + while (true) { + // find closing backtick + idx = path.indexOf(String.valueOf(BACKTICK), idx); + if (idx == -1) { // if not found, then fail + failWhenIncompleteBacktickPair(path, backtickAt); + } + + // backtick escaped if right after backslash + boolean escaped = path.charAt(idx - 1) == BACKSLASH; + + if (idx >= path.length() - 1) { // at the end of path + if (escaped) { // but escaped, then fail + failWhenIncompleteBacktickPair(path, backtickAt); + } + field.append(path, start, idx); + // we've reached the end of the path, and the last character is the backtick + steps.add(field.toString()); + idx++; + break; + } + + if (path.charAt(idx + 1) != DOT) { // not followed by a dot + // this backtick isn't followed by a dot; include it in the field name, but continue + // looking for a matching backtick that is followed by a dot + idx++; + continue; + } + + if (escaped) { + // this backtick was escaped; include it in the field name, but continue + // looking for an unescaped matching backtick + field.append(path, start, idx - 1) + .append(BACKTICK); + + idx++; + start = idx; + continue; + } + + // we've found our matching backtick + field.append(path, start, idx); + steps.add(field.toString()); + idx += 2; // increment by two to include the backtick and the dot after it + break; + } + } + } + // add last step if last char is a dot + if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT) + steps.add(""); + return Collections.unmodifiableList(steps); + } + + private static void failWhenIncompleteBacktickPair(String path, int backtickAt) { + throw new ConfigException("Incomplete backtick pair in path: [" + path + "]," + + " consider adding a backslash before backtick at position " + backtickAt + + " to escape it"); + } + + /** + * Access a {@code Field} at the current path within a schema {@code Schema} + * If field is not found, then {@code null} is returned. + */ + public Field fieldFrom(Schema schema) { + if (schema == null) return null; + + Schema current = schema; + for (String pathSegment : stepsWithoutLast()) { + final Field field = current.field(pathSegment); + if (field != null) { + current = field.schema(); + } else { + return null; + } + } + return current.field(lastStep()); + } + + /** + * Access a value at the current path within a schema-based {@code Struct} + * If object is not found, then {@code null} is returned. + */ + public Object valueFrom(Struct struct) { + if (struct == null) return null; + + Struct current = struct; + for (String pathSegment : stepsWithoutLast()) { + // Check to see if the field actually exists + if (current.schema().field(pathSegment) == null) { + return null; + } + current = current.getStruct(pathSegment); + if (current == null) return null; + } + + if (current.schema().field(lastStep()) != null) { + return current.get(lastStep()); + } else { + return null; + } + } + + List<String> stepsWithoutLast() { Review Comment: Also nit: would be nice to move this lower to be next to the other private/should-be-private methods like `lastStep` and `lastStepIndex`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org