This is an automated email from the ASF dual-hosted git repository.

chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 74bd798097 NIFI-12773: Added join and anchored RecordPath function
74bd798097 is described below

commit 74bd798097e15d54b871ac3ef7654a0d3433f99a
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Fri Feb 9 16:58:09 2024 -0500

    NIFI-12773: Added join and anchored RecordPath function
    
    Signed-off-by: Chris Sampson <chris.sampso...@gmail.com>
    
    This closes #8391
---
 nifi-commons/nifi-record-path/pom.xml              |  4 ++
 .../nifi/record/path/functions/Anchored.java       | 83 +++++++++++++++++++++
 .../apache/nifi/record/path/functions/Join.java    | 84 ++++++++++++++++++++++
 .../nifi/record/path/paths/RecordPathCompiler.java | 24 +++++++
 .../apache/nifi/record/path/TestRecordPath.java    | 83 +++++++++++++++++++++
 nifi-docs/src/main/asciidoc/record-path-guide.adoc | 42 +++++++++++
 6 files changed, 320 insertions(+)

diff --git a/nifi-commons/nifi-record-path/pom.xml 
b/nifi-commons/nifi-record-path/pom.xml
index 17d9e762e2..ead5937227 100644
--- a/nifi-commons/nifi-record-path/pom.xml
+++ b/nifi-commons/nifi-record-path/pom.xml
@@ -103,6 +103,10 @@
             <artifactId>nifi-uuid5</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-property-utils</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.antlr</groupId>
             <artifactId>antlr-runtime</artifactId>
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java
new file mode 100644
index 0000000000..2874295d79
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardRecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class Anchored extends RecordPathSegment {
+
+    private final RecordPathSegment anchorPath;
+    private final RecordPathSegment evaluationPath;
+
+    public Anchored(final RecordPathSegment anchorPath, final 
RecordPathSegment evaluationPath, final boolean absolute) {
+        super("anchored", null, absolute);
+
+        this.anchorPath = anchorPath;
+        this.evaluationPath = evaluationPath;
+    }
+
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        final Stream<FieldValue> anchoredStream = anchorPath.evaluate(context);
+
+        return anchoredStream.flatMap(fv -> {
+            final Object value = fv.getValue();
+            return evaluateFieldValue(value);
+        });
+    }
+
+    private Stream<FieldValue> evaluateFieldValue(final Object value) {
+        if (value == null) {
+            return Stream.of();
+        }
+
+        if (value instanceof Record) {
+            return evaluateAtRoot((Record) value);
+        }
+
+        if (value instanceof final Record[] array) {
+            return Arrays.stream(array).flatMap(this::evaluateAtRoot);
+        }
+
+        if (value instanceof final Iterable<?> iterable) {
+            return StreamSupport.stream(iterable.spliterator(), 
false).flatMap(element -> {
+                if (!(element instanceof Record)) {
+                    return Stream.of();
+                }
+
+                return evaluateAtRoot((Record) element);
+            });
+        }
+
+        return Stream.of();
+    }
+
+    private Stream<FieldValue> evaluateAtRoot(final Record root) {
+        final RecordPathEvaluationContext recordPathEvaluateContext = new 
StandardRecordPathEvaluationContext(root);
+        return evaluationPath.evaluate(recordPathEvaluateContext);
+    }
+}
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java
new file mode 100644
index 0000000000..da9cee488f
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class Join extends RecordPathSegment {
+    private final RecordPathSegment delimiterPath;
+    private final RecordPathSegment[] valuePaths;
+
+    public Join(final RecordPathSegment delimiterPath, final 
RecordPathSegment[] valuePaths, final boolean absolute) {
+        super("join", null, absolute);
+        this.delimiterPath = delimiterPath;
+        this.valuePaths = valuePaths;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        String delimiter = RecordPathUtils.getFirstStringValue(delimiterPath, 
context);
+        if (delimiter == null) {
+            delimiter = "";
+        }
+
+        final List<String> values = new ArrayList<>();
+        for (final RecordPathSegment valuePath : valuePaths) {
+            final Stream<FieldValue> stream = valuePath.evaluate(context);
+
+            stream.forEach(fv -> {
+                final Object value = fv.getValue();
+                addStringValue(value, values);
+            });
+        }
+
+        final String joined = String.join(delimiter, values);
+        final RecordField field = new RecordField("join", 
RecordFieldType.STRING.getDataType());
+        final FieldValue responseValue = new StandardFieldValue(joined, field, 
null);
+        return Stream.of(responseValue);
+    }
+
+    private void addStringValue(final Object value, final List<String> values) 
{
+        if (value == null) {
+            values.add("null");
+            return;
+        }
+
+        if (value instanceof final Object[] array) {
+            for (final Object element : array) {
+                addStringValue(element, values);
+            }
+        } else if (value instanceof final Iterable<?> iterable) {
+            for (final Object element : iterable) {
+                addStringValue(element, values);
+            }
+        } else {
+            values.add(DataTypeUtils.toString(value, null));
+        }
+    }
+}
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index c6f23b145b..9061014295 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -35,6 +35,7 @@ import org.apache.nifi.record.path.filter.NotEqualsFilter;
 import org.apache.nifi.record.path.filter.NotFilter;
 import org.apache.nifi.record.path.filter.RecordPathFilter;
 import org.apache.nifi.record.path.filter.StartsWith;
+import org.apache.nifi.record.path.functions.Anchored;
 import org.apache.nifi.record.path.functions.Base64Decode;
 import org.apache.nifi.record.path.functions.Base64Encode;
 import org.apache.nifi.record.path.functions.Coalesce;
@@ -45,6 +46,7 @@ import org.apache.nifi.record.path.functions.FieldName;
 import org.apache.nifi.record.path.functions.FilterFunction;
 import org.apache.nifi.record.path.functions.Format;
 import org.apache.nifi.record.path.functions.Hash;
+import org.apache.nifi.record.path.functions.Join;
 import org.apache.nifi.record.path.functions.MapOf;
 import org.apache.nifi.record.path.functions.PadLeft;
 import org.apache.nifi.record.path.functions.PadRight;
@@ -129,6 +131,10 @@ public class RecordPathCompiler {
             }
             case CHILD_REFERENCE: {
                 final Tree childTree = tree.getChild(0);
+                if (childTree == null) {
+                    return new RootPath();
+                }
+
                 final int childTreeType = childTree.getType();
                 if (childTreeType == FIELD_NAME) {
                     final String childName = childTree.getChild(0).getText();
@@ -404,6 +410,24 @@ public class RecordPathCompiler {
                         final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 1, functionName, absolute);
                         return new Count(args[0], absolute);
                     }
+                    case "join": {
+                        final int numArgs = argumentListTree.getChildCount();
+                        if (numArgs < 2) {
+                            throw new RecordPathException("Invalid number of 
arguments: " + functionName + " function takes 2 or more arguments but got " + 
numArgs);
+                        }
+
+                        final RecordPathSegment[] joinPaths = new 
RecordPathSegment[numArgs - 1];
+                        for (int i = 0; i < numArgs - 1; i++) {
+                            joinPaths[i] = 
buildPath(argumentListTree.getChild(i + 1), null, absolute);
+                        }
+
+                        final RecordPathSegment delimiterPath = 
buildPath(argumentListTree.getChild(0), null, absolute);
+                        return new Join(delimiterPath, joinPaths, absolute);
+                    }
+                    case "anchored": {
+                        final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 2, functionName, absolute);
+                        return new Anchored(args[0], args[1], absolute);
+                    }
                     case "not":
                     case "contains":
                     case "containsRegex":
diff --git 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index 2b6341124c..f374325528 100644
--- 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++ 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -1241,6 +1241,89 @@ public class TestRecordPath {
         assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' 
', /lastName, ': ', 
48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
     }
 
+    @Test
+    public void testJoinWithTwoFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("fullName", 
RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("firstName", 
RecordFieldType.LONG.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("lastName", "Doe");
+        values.put("firstName", "John");
+        final Record record = new MapRecord(schema, values);
+
+        assertEquals("Doe, John", RecordPath.compile("join(', ', /lastName, 
/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+    }
+
+    @Test
+    public void testJoinWithArray() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("names", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("names", new String[] {"John", "Jane", "Jacob", "Judy"});
+        final Record record = new MapRecord(schema, values);
+
+        assertEquals("John,Jane,Jacob,Judy", RecordPath.compile("join(',', 
/names)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+    }
+
+    @Test
+    public void testJoinWithArrayAndMultipleFields() {
+        final List<RecordField> personFields = new ArrayList<>();
+        personFields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("firstName", 
RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("friends", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+        final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("friends", new String[] {"John", "Jane", "Jacob", "Judy"});
+        values.put("firstName", "John");
+        values.put("lastName", "Doe");
+        final Record record = new MapRecord(personSchema, values);
+
+        assertEquals("Doe\nJohn\nJane\nJacob", RecordPath.compile("join('\\n', 
/lastName, /firstName, 
/friends[1..2])").evaluate(record).getSelectedFields().findFirst().get().getValue());
+    }
+
+    @Test
+    public void testAnchored() {
+        final List<RecordField> personFields = new ArrayList<>();
+        personFields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("firstName", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+
+        final List<RecordField> employeeFields = new ArrayList<>();
+        employeeFields.add(new RecordField("self", 
RecordFieldType.RECORD.getRecordDataType(personSchema)));
+        employeeFields.add(new RecordField("manager", 
RecordFieldType.RECORD.getRecordDataType(personSchema)));
+        employeeFields.add(new RecordField("directReports", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(personSchema))));
+        final RecordSchema employeeSchema = new 
SimpleRecordSchema(employeeFields);
+
+        final Record directReport1 = createPerson("John", "Doe", personSchema);
+        final Record directReport2 = createPerson("John", "Jingleheimer", 
personSchema);
+        final Record directReport3 = createPerson("John", "Jacob", 
personSchema);
+        final Record manager = createPerson("Jane", "Smith", personSchema);
+        final Record employee = new MapRecord(employeeSchema, Map.of(
+            "self", createPerson("John", "Schmidt", personSchema),
+            "manager", manager,
+            "directReports", new Record[] {directReport1, directReport2, 
directReport3}
+        ));
+
+        assertEquals("John", RecordPath.compile("anchored(/directReports[0], 
/firstName)").evaluate(employee).getSelectedFields().findFirst().get().getValue());
+        assertEquals(List.of("John", "John", "John"), 
RecordPath.compile("anchored(/directReports, 
/firstName)").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList());
+        assertEquals(List.of(), RecordPath.compile("anchored(/self/lastName, / 
)").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList());
+    }
+
+    private Record createPerson(final String firstName, final String lastName, 
final RecordSchema schema) {
+        final Map<String, Object> values = Map.of(
+            "firstName", firstName,
+            "lastName", lastName);
+        return new MapRecord(schema, values);
+    }
+
+
     @Test
     public void testMapOf() {
         final List<RecordField> fields = new ArrayList<>();
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc 
b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
index a4d0323c4c..fae0538d64 100644
--- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -456,6 +456,48 @@ Concatenates all the arguments together.
 |==========================================================
 
 
+=== join
+
+Joins together multiple values with a separator.
+
+|==========================================================
+| RecordPath | Return value
+| `join(', ', /workAddress/* )` | 123, 5th Avenue, New York, NY, 10020
+|==========================================================
+
+
+=== anchored
+
+Allows evaluating a RecordPath while anchoring the root context to a child 
record.
+
+|==========================================================
+| RecordPath | Return value
+| `anchored(/homeAddress, /city)` | Jersey City
+|==========================================================
+
+Additionally, this can be used in conjunction with arrays. For example, if we 
have the following record:
+----
+{
+    "id": "1234",
+    "elements": [{
+        "name": "book",
+        "color": "red"
+    }, {
+        "name": "computer",
+        "color": "black"
+    }]
+}
+----
+
+We can evaluate hte following Record paths:
+
+|==========================================================
+| RecordPath | Return value
+| `anchored(/elements, /name)` | The array containing `book` and `computer`
+| `anchored(/elements, concat(/name, ': ', /color))` | The array containing 
`book: red` and `computer: black`
+|==========================================================
+
+
 === fieldName
 
 Normally, when a path is given to a particular field in a Record, what is 
returned is the value of that field. It

Reply via email to