Repository: nifi
Updated Branches:
  refs/heads/master 57ae9b65a -> 5a84d650c


NIFI-5449: Added Base64 Encode/Decode functions to RecordPath
NIFI-5449: Incorporated review comments

This closes #2920

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a84d650
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a84d650
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a84d650

Branch: refs/heads/master
Commit: 5a84d650c3a35b6d48d662c47e10fd2fc733f89b
Parents: 57ae9b6
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Thu Jul 26 17:24:14 2018 -0400
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Fri Sep 14 14:04:25 2018 -0400

----------------------------------------------------------------------
 .../record/path/functions/Base64Decode.java     | 54 +++++++++++++++
 .../record/path/functions/Base64Encode.java     | 58 ++++++++++++++++
 .../record/path/paths/RecordPathCompiler.java   | 10 +++
 .../apache/nifi/record/path/TestRecordPath.java | 70 ++++++++++++++++++++
 .../src/main/asciidoc/record-path-guide.adoc    | 59 +++++++++++++++++
 5 files changed, 251 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Decode.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Decode.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Decode.java
new file mode 100644
index 0000000..f5a1e5c
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Decode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.stream.Stream;
+
+public class Base64Decode extends RecordPathSegment {
+    private final RecordPathSegment recordPath;
+
+    public Base64Decode(final RecordPathSegment recordPath, final boolean 
absolute) {
+        super("base64Decode", null, absolute);
+        this.recordPath = recordPath;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+                .map(fv -> {
+
+                    Object value = fv.getValue();
+                    if (value instanceof String) {
+                        return new StandardFieldValue(new 
String(Base64.getDecoder().decode(fv.getValue().toString()), 
StandardCharsets.UTF_8), fv.getField(), fv.getParent().orElse(null));
+                    } else if (value instanceof byte[]) {
+                        return new 
StandardFieldValue(Base64.getDecoder().decode((byte[]) value), fv.getField(), 
fv.getParent().orElse(null));
+                    } else {
+                        throw new IllegalArgumentException("Argument supplied 
to base64Decode must be a String or byte[]");
+                    }
+                });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Encode.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Encode.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Encode.java
new file mode 100644
index 0000000..69a878b
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Encode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Base64;
+import java.util.stream.Stream;
+
+public class Base64Encode extends RecordPathSegment {
+    private final RecordPathSegment recordPath;
+
+    public Base64Encode(final RecordPathSegment recordPath, final boolean 
absolute) {
+        super("base64Encode", null, absolute);
+        this.recordPath = recordPath;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+                .map(fv -> {
+
+                    Object value = fv.getValue();
+                    if (value instanceof String) {
+                        try {
+                            return new 
StandardFieldValue(Base64.getEncoder().encodeToString(value.toString().getBytes("UTF-8")),
 fv.getField(), fv.getParent().orElse(null));
+                        } catch (final UnsupportedEncodingException e) {
+                            return null;    // won't happen.
+                        }
+                    } else if (value instanceof byte[]) {
+                        return new 
StandardFieldValue(Base64.getEncoder().encode((byte[]) value), fv.getField(), 
fv.getParent().orElse(null));
+                    } else {
+                        throw new IllegalArgumentException("Argument supplied 
to base64Encode must be a String or byte[]");
+                    }
+                });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index 9a2821a..a30419d 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -64,6 +64,8 @@ import org.apache.nifi.record.path.filter.NotEqualsFilter;
 import org.apache.nifi.record.path.filter.NotFilter;
 import org.apache.nifi.record.path.filter.RecordPathFilter;
 import org.apache.nifi.record.path.filter.StartsWith;
+import org.apache.nifi.record.path.functions.Base64Decode;
+import org.apache.nifi.record.path.functions.Base64Encode;
 import org.apache.nifi.record.path.functions.Concat;
 import org.apache.nifi.record.path.functions.Format;
 import org.apache.nifi.record.path.functions.FieldName;
@@ -264,6 +266,14 @@ public class RecordPathCompiler {
                         final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 2, functionName, absolute);
                         return new Format(args[0], args[1], absolute);
                     }
+                    case "base64Encode": {
+                        final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 1, functionName, absolute);
+                        return new Base64Encode(args[0], absolute);
+                    }
+                    case "base64Decode": {
+                        final RecordPathSegment[] args = 
getArgPaths(argumentListTree, 1, functionName, absolute);
+                        return new Base64Decode(args[0], absolute);
+                    }
                     default: {
                         throw new RecordPathException("Invalid function call: 
The '" + functionName + "' function does not exist or can only "
                             + "be used within a predicate, not as a standalone 
function");

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index 89b0893..67c14e6 100644
--- 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++ 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -27,11 +27,14 @@ import java.sql.Date;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.nifi.record.path.exception.RecordPathException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -1279,6 +1282,73 @@ public class TestRecordPath {
         RecordPath.compile("toBytes(/s, \"NOT A REAL 
CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue();
     }
 
+    @Test
+    public void testBase64Encode() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("firstName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("b", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final List<Object> expectedValues = Arrays.asList(
+                
Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8)),
+                
Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)),
+                
Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8))
+        );
+        final Map<String, Object> values = new HashMap<>();
+        values.put("firstName", "John");
+        values.put("lastName", "Doe");
+        values.put("b", "xyz".getBytes(StandardCharsets.UTF_8));
+        final Record record = new MapRecord(schema, values);
+
+        
assertEquals(Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8)),
+                
RecordPath.compile("base64Encode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        
assertEquals(Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)),
+                
RecordPath.compile("base64Encode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        
assertTrue(Arrays.equals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)),
+                (byte[]) 
RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()));
+        List<Object> actualValues = 
RecordPath.compile("base64Encode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
+        IntStream.range(0, 3).forEach(i -> {
+            Object expectedObject = expectedValues.get(i);
+            Object actualObject = actualValues.get(i);
+            if (actualObject instanceof String) {
+                assertEquals(expectedObject, actualObject);
+            } else if (actualObject instanceof byte[]) {
+                assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) 
actualObject));
+            }
+        });
+    }
+
+    @Test
+    public void testBase64Decode() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("firstName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("b", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final List<Object> expectedValues = Arrays.asList("John", "Doe", 
"xyz".getBytes(StandardCharsets.UTF_8));
+        final Map<String, Object> values = new HashMap<>();
+        values.put("firstName", 
Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8)));
+        values.put("lastName", 
Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)));
+        values.put("b", 
Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)));
+        final Record record = new MapRecord(schema, values);
+
+        assertEquals("John", 
RecordPath.compile("base64Decode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertEquals("Doe", 
RecordPath.compile("base64Decode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertTrue(Arrays.equals("xyz".getBytes(StandardCharsets.UTF_8), 
(byte[]) 
RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()));
+        List<Object> actualValues = 
RecordPath.compile("base64Decode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
+        IntStream.range(0, 3).forEach(i -> {
+            Object expectedObject = expectedValues.get(i);
+            Object actualObject = actualValues.get(i);
+            if (actualObject instanceof String) {
+                assertEquals(expectedObject, actualObject);
+            } else if (actualObject instanceof byte[]) {
+                assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) 
actualObject));
+            }
+        });
+    }
+
     private List<RecordField> getDefaultFields() {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-docs/src/main/asciidoc/record-path-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc 
b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
index 8de98ee..14a2299 100644
--- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -627,6 +627,65 @@ The following record path expression would re-format the 
date String:
 | `format( toDate(/eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'"), 'yyyy-MM-dd')` | 
2017-10-20
 |==========================================================
 
+=== base64Encode
+
+Converts a String or byte[] using Base64 encoding, using the UTF-8 character 
set.  For example, given a schema such as:
+
+----
+{
+  "type": "record",
+  "name": "events",
+  "fields": [
+    { "name": "name", "type": "string" }
+  ]
+}
+----
+
+and a record such as:
+
+----
+{
+  "name" : "John"
+}
+----
+
+The following record path expression would encode the String using Base64:
+
+|==========================================================
+| RecordPath | Return value
+| `base64Encode(/name)` | Sm9obg==
+|==========================================================
+
+=== base64Decode
+
+Decodes a Base64-encoded String or byte[].  For example, given a schema such 
as:
+
+----
+{
+  "type": "record",
+  "name": "events",
+  "fields": [
+    { "name": "name", "type": "string" }
+  ]
+}
+----
+
+and a record such as:
+
+----
+{
+  "name" : "Sm9obg=="
+}
+----
+
+The following record path expression would decode the String using Base64:
+
+|==========================================================
+| RecordPath | Return value
+| `base64Decode(/name)` | John
+|==========================================================
+
+
 [[filter_functions]]
 == Filter Functions
 

Reply via email to