This is an automated email from the ASF dual-hosted git repository.
chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 74bd798097 NIFI-12773: Added join and anchored RecordPath function
74bd798097 is described below
commit 74bd798097e15d54b871ac3ef7654a0d3433f99a
Author: Mark Payne <[email protected]>
AuthorDate: Fri Feb 9 16:58:09 2024 -0500
NIFI-12773: Added join and anchored RecordPath function
Signed-off-by: Chris Sampson <[email protected]>
This closes #8391
---
nifi-commons/nifi-record-path/pom.xml | 4 ++
.../nifi/record/path/functions/Anchored.java | 83 +++++++++++++++++++++
.../apache/nifi/record/path/functions/Join.java | 84 ++++++++++++++++++++++
.../nifi/record/path/paths/RecordPathCompiler.java | 24 +++++++
.../apache/nifi/record/path/TestRecordPath.java | 83 +++++++++++++++++++++
nifi-docs/src/main/asciidoc/record-path-guide.adoc | 42 +++++++++++
6 files changed, 320 insertions(+)
diff --git a/nifi-commons/nifi-record-path/pom.xml
b/nifi-commons/nifi-record-path/pom.xml
index 17d9e762e2..ead5937227 100644
--- a/nifi-commons/nifi-record-path/pom.xml
+++ b/nifi-commons/nifi-record-path/pom.xml
@@ -103,6 +103,10 @@
<artifactId>nifi-uuid5</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-property-utils</artifactId>
+ </dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java
new file mode 100644
index 0000000000..2874295d79
--- /dev/null
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Anchored.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardRecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class Anchored extends RecordPathSegment {
+
+ private final RecordPathSegment anchorPath;
+ private final RecordPathSegment evaluationPath;
+
+ public Anchored(final RecordPathSegment anchorPath, final
RecordPathSegment evaluationPath, final boolean absolute) {
+ super("anchored", null, absolute);
+
+ this.anchorPath = anchorPath;
+ this.evaluationPath = evaluationPath;
+ }
+
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext
context) {
+ final Stream<FieldValue> anchoredStream = anchorPath.evaluate(context);
+
+ return anchoredStream.flatMap(fv -> {
+ final Object value = fv.getValue();
+ return evaluateFieldValue(value);
+ });
+ }
+
+ private Stream<FieldValue> evaluateFieldValue(final Object value) {
+ if (value == null) {
+ return Stream.of();
+ }
+
+ if (value instanceof Record) {
+ return evaluateAtRoot((Record) value);
+ }
+
+ if (value instanceof final Record[] array) {
+ return Arrays.stream(array).flatMap(this::evaluateAtRoot);
+ }
+
+ if (value instanceof final Iterable<?> iterable) {
+ return StreamSupport.stream(iterable.spliterator(),
false).flatMap(element -> {
+ if (!(element instanceof Record)) {
+ return Stream.of();
+ }
+
+ return evaluateAtRoot((Record) element);
+ });
+ }
+
+ return Stream.of();
+ }
+
+ private Stream<FieldValue> evaluateAtRoot(final Record root) {
+ final RecordPathEvaluationContext recordPathEvaluateContext = new
StandardRecordPathEvaluationContext(root);
+ return evaluationPath.evaluate(recordPathEvaluateContext);
+ }
+}
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java
new file mode 100644
index 0000000000..da9cee488f
--- /dev/null
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Join.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class Join extends RecordPathSegment {
+ private final RecordPathSegment delimiterPath;
+ private final RecordPathSegment[] valuePaths;
+
+ public Join(final RecordPathSegment delimiterPath, final
RecordPathSegment[] valuePaths, final boolean absolute) {
+ super("join", null, absolute);
+ this.delimiterPath = delimiterPath;
+ this.valuePaths = valuePaths;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext
context) {
+ String delimiter = RecordPathUtils.getFirstStringValue(delimiterPath,
context);
+ if (delimiter == null) {
+ delimiter = "";
+ }
+
+ final List<String> values = new ArrayList<>();
+ for (final RecordPathSegment valuePath : valuePaths) {
+ final Stream<FieldValue> stream = valuePath.evaluate(context);
+
+ stream.forEach(fv -> {
+ final Object value = fv.getValue();
+ addStringValue(value, values);
+ });
+ }
+
+ final String joined = String.join(delimiter, values);
+ final RecordField field = new RecordField("join",
RecordFieldType.STRING.getDataType());
+ final FieldValue responseValue = new StandardFieldValue(joined, field,
null);
+ return Stream.of(responseValue);
+ }
+
+ private void addStringValue(final Object value, final List<String> values)
{
+ if (value == null) {
+ values.add("null");
+ return;
+ }
+
+ if (value instanceof final Object[] array) {
+ for (final Object element : array) {
+ addStringValue(element, values);
+ }
+ } else if (value instanceof final Iterable<?> iterable) {
+ for (final Object element : iterable) {
+ addStringValue(element, values);
+ }
+ } else {
+ values.add(DataTypeUtils.toString(value, null));
+ }
+ }
+}
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index c6f23b145b..9061014295 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -35,6 +35,7 @@ import org.apache.nifi.record.path.filter.NotEqualsFilter;
import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith;
+import org.apache.nifi.record.path.functions.Anchored;
import org.apache.nifi.record.path.functions.Base64Decode;
import org.apache.nifi.record.path.functions.Base64Encode;
import org.apache.nifi.record.path.functions.Coalesce;
@@ -45,6 +46,7 @@ import org.apache.nifi.record.path.functions.FieldName;
import org.apache.nifi.record.path.functions.FilterFunction;
import org.apache.nifi.record.path.functions.Format;
import org.apache.nifi.record.path.functions.Hash;
+import org.apache.nifi.record.path.functions.Join;
import org.apache.nifi.record.path.functions.MapOf;
import org.apache.nifi.record.path.functions.PadLeft;
import org.apache.nifi.record.path.functions.PadRight;
@@ -129,6 +131,10 @@ public class RecordPathCompiler {
}
case CHILD_REFERENCE: {
final Tree childTree = tree.getChild(0);
+ if (childTree == null) {
+ return new RootPath();
+ }
+
final int childTreeType = childTree.getType();
if (childTreeType == FIELD_NAME) {
final String childName = childTree.getChild(0).getText();
@@ -404,6 +410,24 @@ public class RecordPathCompiler {
final RecordPathSegment[] args =
getArgPaths(argumentListTree, 1, functionName, absolute);
return new Count(args[0], absolute);
}
+ case "join": {
+ final int numArgs = argumentListTree.getChildCount();
+ if (numArgs < 2) {
+ throw new RecordPathException("Invalid number of
arguments: " + functionName + " function takes 2 or more arguments but got " +
numArgs);
+ }
+
+ final RecordPathSegment[] joinPaths = new
RecordPathSegment[numArgs - 1];
+ for (int i = 0; i < numArgs - 1; i++) {
+ joinPaths[i] =
buildPath(argumentListTree.getChild(i + 1), null, absolute);
+ }
+
+ final RecordPathSegment delimiterPath =
buildPath(argumentListTree.getChild(0), null, absolute);
+ return new Join(delimiterPath, joinPaths, absolute);
+ }
+ case "anchored": {
+ final RecordPathSegment[] args =
getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new Anchored(args[0], args[1], absolute);
+ }
case "not":
case "contains":
case "containsRegex":
diff --git
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index 2b6341124c..f374325528 100644
---
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -1241,6 +1241,89 @@ public class TestRecordPath {
assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, '
', /lastName, ': ',
48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
+ @Test
+ public void testJoinWithTwoFields() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("fullName",
RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("lastName",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("firstName",
RecordFieldType.LONG.getDataType()));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("lastName", "Doe");
+ values.put("firstName", "John");
+ final Record record = new MapRecord(schema, values);
+
+ assertEquals("Doe, John", RecordPath.compile("join(', ', /lastName,
/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ }
+
+ @Test
+ public void testJoinWithArray() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("names",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("names", new String[] {"John", "Jane", "Jacob", "Judy"});
+ final Record record = new MapRecord(schema, values);
+
+ assertEquals("John,Jane,Jacob,Judy", RecordPath.compile("join(',',
/names)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ }
+
+ @Test
+ public void testJoinWithArrayAndMultipleFields() {
+ final List<RecordField> personFields = new ArrayList<>();
+ personFields.add(new RecordField("lastName",
RecordFieldType.STRING.getDataType()));
+ personFields.add(new RecordField("firstName",
RecordFieldType.STRING.getDataType()));
+ personFields.add(new RecordField("friends",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+ final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("friends", new String[] {"John", "Jane", "Jacob", "Judy"});
+ values.put("firstName", "John");
+ values.put("lastName", "Doe");
+ final Record record = new MapRecord(personSchema, values);
+
+ assertEquals("Doe\nJohn\nJane\nJacob", RecordPath.compile("join('\\n',
/lastName, /firstName,
/friends[1..2])").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ }
+
+ @Test
+ public void testAnchored() {
+ final List<RecordField> personFields = new ArrayList<>();
+ personFields.add(new RecordField("lastName",
RecordFieldType.STRING.getDataType()));
+ personFields.add(new RecordField("firstName",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+
+ final List<RecordField> employeeFields = new ArrayList<>();
+ employeeFields.add(new RecordField("self",
RecordFieldType.RECORD.getRecordDataType(personSchema)));
+ employeeFields.add(new RecordField("manager",
RecordFieldType.RECORD.getRecordDataType(personSchema)));
+ employeeFields.add(new RecordField("directReports",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(personSchema))));
+ final RecordSchema employeeSchema = new
SimpleRecordSchema(employeeFields);
+
+ final Record directReport1 = createPerson("John", "Doe", personSchema);
+ final Record directReport2 = createPerson("John", "Jingleheimer",
personSchema);
+ final Record directReport3 = createPerson("John", "Jacob",
personSchema);
+ final Record manager = createPerson("Jane", "Smith", personSchema);
+ final Record employee = new MapRecord(employeeSchema, Map.of(
+ "self", createPerson("John", "Schmidt", personSchema),
+ "manager", manager,
+ "directReports", new Record[] {directReport1, directReport2,
directReport3}
+ ));
+
+ assertEquals("John", RecordPath.compile("anchored(/directReports[0],
/firstName)").evaluate(employee).getSelectedFields().findFirst().get().getValue());
+ assertEquals(List.of("John", "John", "John"),
RecordPath.compile("anchored(/directReports,
/firstName)").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList());
+ assertEquals(List.of(), RecordPath.compile("anchored(/self/lastName, /
)").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList());
+ }
+
+ private Record createPerson(final String firstName, final String lastName,
final RecordSchema schema) {
+ final Map<String, Object> values = Map.of(
+ "firstName", firstName,
+ "lastName", lastName);
+ return new MapRecord(schema, values);
+ }
+
+
@Test
public void testMapOf() {
final List<RecordField> fields = new ArrayList<>();
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
index a4d0323c4c..fae0538d64 100644
--- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -456,6 +456,48 @@ Concatenates all the arguments together.
|==========================================================
+=== join
+
+Joins together multiple values with a separator.
+
+|==========================================================
+| RecordPath | Return value
+| `join(', ', /workAddress/* )` | 123, 5th Avenue, New York, NY, 10020
+|==========================================================
+
+
+=== anchored
+
+Allows evaluating a RecordPath while anchoring the root context to a child
record.
+
+|==========================================================
+| RecordPath | Return value
+| `anchored(/homeAddress, /city)` | Jersey City
+|==========================================================
+
+Additionally, this can be used in conjunction with arrays. For example, if we
have the following record:
+----
+{
+ "id": "1234",
+ "elements": [{
+ "name": "book",
+ "color": "red"
+ }, {
+ "name": "computer",
+ "color": "black"
+ }]
+}
+----
+
+We can evaluate hte following Record paths:
+
+|==========================================================
+| RecordPath | Return value
+| `anchored(/elements, /name)` | The array containing `book` and `computer`
+| `anchored(/elements, concat(/name, ': ', /color))` | The array containing
`book: red` and `computer: black`
+|==========================================================
+
+
=== fieldName
Normally, when a path is given to a particular field in a Record, what is
returned is the value of that field. It