yashmayya commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r974127639


##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * <br/>
  * Connectors can be configured with transformations to make lightweight 
message-at-a-time modifications.
  */
 public interface Transformation<R extends ConnectRecord<R>> extends 
Configurable, Closeable {
 
+    String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to 
access fields. "
+        + "If set to `V1`, then the field paths are limited to access the 
elements at the root level of the struct or map."
+        + "If set to `V2`, the syntax will support accessing nested elements. 
o access nested elements, "
+        + "dotted notation is used. If dots are already included in the field 
name, then backtick pairs "
+        + "can be used to wrap field names containing dots. "
+        + "e.g. to access elements from a struct/map named \"foo.bar\", "
+        + "the following format can be used to access its elements: 
\"`foo.bar`.baz\".";

Review Comment:
   I think this should be `from a field in a struct/map named ...`  instead?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java:
##########
@@ -39,33 +41,35 @@
     private static final String FIELD_CONFIG = "field";
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(FIELD_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to 
extract.");
+            .define(FIELD_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to 
extract.")
+            .define(FIELD_SYNTAX_VERSION_CONFIG, ConfigDef.Type.STRING, 
FIELD_SYNTAX_VERSION_DEFAULT_VALUE, ConfigDef.Importance.HIGH, 
FIELD_SYNTAX_VERSION_DOC);

Review Comment:
   Could we add a validator here to ensure that `FIELD_SYNTAX_VERSION_CONFIG` 
belongs to `FieldSyntaxVersion.values()` (maybe case-insensitive match if 
desired)?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names 
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || 
version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return 
path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes

Review Comment:
   ```suggestion
                           final StringBuilder s = new StringBuilder(path); // 
avoid creating new string on changes
   ```
   
   nit: unnecessary newline



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names 
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || 
version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return 
path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {

Review Comment:
   Why do we need this check?



##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * <br/>
  * Connectors can be configured with transformations to make lightweight 
message-at-a-time modifications.
  */
 public interface Transformation<R extends ConnectRecord<R>> extends 
Configurable, Closeable {
 
+    String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to 
access fields. "
+        + "If set to `V1`, then the field paths are limited to access the 
elements at the root level of the struct or map."
+        + "If set to `V2`, the syntax will support accessing nested elements. 
o access nested elements, "
+        + "dotted notation is used. If dots are already included in the field 
name, then backtick pairs "
+        + "can be used to wrap field names containing dots. "
+        + "e.g. to access elements from a struct/map named \"foo.bar\", "
+        + "the following format can be used to access its elements: 
\"`foo.bar`.baz\".";
+
+    String FIELD_SYNTAX_VERSION_DEFAULT_VALUE = "V1";

Review Comment:
   I think this (and the above config too) can be moved to `FieldPath` or 
`FieldSyntaxVersion` in the `connect:transforms` subproject since they don't 
need to be added to the public interface? And then you can avoid using the 
string form "V1" directly here. 
   
   Or else, if the intention *is* to add it to the public interface and allow 
custom SMT builders to also use this new notation, then should we consider 
moving the `FieldPath` and related classes to the `connect:api` subproject?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names 
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be

Review Comment:
   ```suggestion
    * <li>If field names contain backticks at wrapping positions (beginning or 
end of path, before or after dots), then backticks need to be
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names 
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || 
version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return 
path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes
+
+                        while (s.length() > 0) { // until path is traverse
+                            // process backtick pair if any
+                            if (s.charAt(0) == BACKTICK_CHAR) {
+                                s.deleteCharAt(0);
+
+                                // find backtick pair
+                                int idx = 0;
+                                while (idx >= 0) {
+                                    idx = s.indexOf(BACKTICK, idx);
+                                    if (idx == -1) {
+                                        throw new IllegalArgumentException(
+                                            "Incomplete backtick pair at 
[...]`" + s);
+                                    }
+                                    if (idx != s.length() - 1) { // non-global 
backtick
+                                        if (s.charAt(idx + 1) != DOT_CHAR
+                                            || s.charAt(idx - 1)
+                                            == BACKSLASH_CHAR) { // not 
wrapped or escaped
+                                            idx++; // move index forward and 
keep searching
+                                        } else { // it's end pair
+                                            steps.add(
+                                                
checkIncompleteBacktickPair(s.substring(0, idx)));
+                                            s.delete(0, idx + 2); // rm 
backtick and dot
+                                            break;
+                                        }
+                                    } else { // global backtick

Review Comment:
   Is a "global" backtick referring to a pair of backticks that wrap the whole 
field path? I didn't get why that requires special handling?



##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * <br/>
  * Connectors can be configured with transformations to make lightweight 
message-at-a-time modifications.
  */
 public interface Transformation<R extends ConnectRecord<R>> extends 
Configurable, Closeable {
 
+    String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to 
access fields. "
+        + "If set to `V1`, then the field paths are limited to access the 
elements at the root level of the struct or map."
+        + "If set to `V2`, the syntax will support accessing nested elements. 
o access nested elements, "

Review Comment:
   ```suggestion
           + "If set to `V2`, the syntax will support accessing nested 
elements. To access nested elements, "
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names 
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }

Review Comment:
   nit: can be package-private if only visible for testing



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names 
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || 
version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return 
path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes
+
+                        while (s.length() > 0) { // until path is traverse
+                            // process backtick pair if any
+                            if (s.charAt(0) == BACKTICK_CHAR) {
+                                s.deleteCharAt(0);
+
+                                // find backtick pair
+                                int idx = 0;
+                                while (idx >= 0) {
+                                    idx = s.indexOf(BACKTICK, idx);
+                                    if (idx == -1) {
+                                        throw new IllegalArgumentException(
+                                            "Incomplete backtick pair at 
[...]`" + s);
+                                    }
+                                    if (idx != s.length() - 1) { // non-global 
backtick
+                                        if (s.charAt(idx + 1) != DOT_CHAR
+                                            || s.charAt(idx - 1)
+                                            == BACKSLASH_CHAR) { // not 
wrapped or escaped
+                                            idx++; // move index forward and 
keep searching
+                                        } else { // it's end pair
+                                            steps.add(
+                                                
checkIncompleteBacktickPair(s.substring(0, idx)));
+                                            s.delete(0, idx + 2); // rm 
backtick and dot
+                                            break;
+                                        }
+                                    } else { // global backtick
+                                        
steps.add(checkIncompleteBacktickPair(s.substring(0, idx)));
+                                        s.delete(0, s.length());
+                                        break;
+                                    }
+                                }
+                            } else { // process path dots
+                                final int atDot = s.indexOf(DOT);
+                                if (atDot > 0) { // get step and move forward
+                                    
steps.add(checkIncompleteBacktickPair(s.substring(0, atDot)));
+                                    s.delete(0, atDot + 1);
+                                } else { // add all
+                                    
steps.add(checkIncompleteBacktickPair(s.toString()));
+                                    s.delete(0, s.length());
+                                }
+                            }
+                        }
+
+                        this.path = steps.toArray(new String[0]);
+                    }
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax 
version: " + version);
+            }
+        }
+    }
+
+    private String checkIncompleteBacktickPair(String field) {

Review Comment:
   Would it be possible to add a doc comment for this method? Also 
`checkIncompleteBacktickPair` seems like a misnomer for a method that is 
returning a String



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+class FieldPathTest {
+    final static String[] EMPTY_PATH = new String[]{};
+
+    @Test void shouldBuildV1WithDotsAndBacktickPair() {
+        assertArrayEquals(new String[] {"foo.bar.baz"}, 
FieldPath.ofV1("foo.bar.baz").path());
+        assertArrayEquals(new String[] {"foo.`bar.baz`"}, 
FieldPath.ofV1("foo.`bar.baz`").path());
+    }
+
+    @Test void shouldBuildV2WithEmptyPath() {
+        assertArrayEquals(EMPTY_PATH, FieldPath.of("", 
FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithNullPath() {
+        assertArrayEquals(EMPTY_PATH, FieldPath.of(null, 
FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithoutDots() {
+        assertArrayEquals(new String[] {"foobarbaz"}, 
FieldPath.of("foobarbaz", FieldSyntaxVersion.V2).path());
+    }
+    @Test void shouldBuildV2WithoutWrappingBackticks() {
+        assertArrayEquals(new String[] {"foo`bar`baz"}, 
FieldPath.of("foo`bar`baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WhenIncludesDots() {
+        assertArrayEquals(new String[] {"foo", "bar", "baz"}, 
FieldPath.of("foo.bar.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
+        assertArrayEquals(new String[] {"foo", "bar.baz"}, 
FieldPath.of("foo.`bar.baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "bar", "baz"}, 
FieldPath.of("foo.`bar`.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() {
+        assertArrayEquals(new String[] {"foo", "ba`r.baz"}, 
FieldPath.of("foo.`ba`r.baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "ba`r", "baz"}, 
FieldPath.of("foo.ba`r.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2AndEscapeBackticks() {
+        assertArrayEquals(new String[] {"foo", "bar`.`baz"}, 
FieldPath.of("foo.`bar\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "bar\\`.`baz"}, 
FieldPath.of("foo.`bar\\\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithBackticksWrappingBackticks() {

Review Comment:
   The rules defined in the KIP don't seem to include this case (although there 
is an example in the subsequent section)? 
   
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures#KIP821:ConnectTransformssupportfornestedstructures-Rules



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to