jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975537150


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names 
with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || 
version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return 
path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes
+
+                        while (s.length() > 0) { // until path is traverse
+                            // process backtick pair if any
+                            if (s.charAt(0) == BACKTICK_CHAR) {
+                                s.deleteCharAt(0);
+
+                                // find backtick pair
+                                int idx = 0;
+                                while (idx >= 0) {
+                                    idx = s.indexOf(BACKTICK, idx);
+                                    if (idx == -1) {
+                                        throw new IllegalArgumentException(
+                                            "Incomplete backtick pair at 
[...]`" + s);
+                                    }
+                                    if (idx != s.length() - 1) { // non-global 
backtick
+                                        if (s.charAt(idx + 1) != DOT_CHAR
+                                            || s.charAt(idx - 1)
+                                            == BACKSLASH_CHAR) { // not 
wrapped or escaped
+                                            idx++; // move index forward and 
keep searching
+                                        } else { // it's end pair
+                                            steps.add(
+                                                
checkIncompleteBacktickPair(s.substring(0, idx)));
+                                            s.delete(0, idx + 2); // rm 
backtick and dot
+                                            break;
+                                        }
+                                    } else { // global backtick
+                                        
steps.add(checkIncompleteBacktickPair(s.substring(0, idx)));
+                                        s.delete(0, s.length());
+                                        break;
+                                    }
+                                }
+                            } else { // process path dots
+                                final int atDot = s.indexOf(DOT);
+                                if (atDot > 0) { // get step and move forward
+                                    
steps.add(checkIncompleteBacktickPair(s.substring(0, atDot)));
+                                    s.delete(0, atDot + 1);
+                                } else { // add all
+                                    
steps.add(checkIncompleteBacktickPair(s.toString()));
+                                    s.delete(0, s.length());
+                                }
+                            }
+                        }
+
+                        this.path = steps.toArray(new String[0]);
+                    }
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax 
version: " + version);
+            }
+        }
+    }
+
+    private String checkIncompleteBacktickPair(String field) {

Review Comment:
   Good call. I have renamed it to `escapeBackticks` and add docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to