Repository: nifi Updated Branches: refs/heads/master 57ae9b65a -> 5a84d650c
NIFI-5449: Added Base64 Encode/Decode functions to RecordPath NIFI-5449: Incorporated review comments This closes #2920 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a84d650 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a84d650 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a84d650 Branch: refs/heads/master Commit: 5a84d650c3a35b6d48d662c47e10fd2fc733f89b Parents: 57ae9b6 Author: Matthew Burgess <mattyb...@apache.org> Authored: Thu Jul 26 17:24:14 2018 -0400 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Fri Sep 14 14:04:25 2018 -0400 ---------------------------------------------------------------------- .../record/path/functions/Base64Decode.java | 54 +++++++++++++++ .../record/path/functions/Base64Encode.java | 58 ++++++++++++++++ .../record/path/paths/RecordPathCompiler.java | 10 +++ .../apache/nifi/record/path/TestRecordPath.java | 70 ++++++++++++++++++++ .../src/main/asciidoc/record-path-guide.adoc | 59 +++++++++++++++++ 5 files changed, 251 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Decode.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Decode.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Decode.java new file mode 100644 index 0000000..f5a1e5c --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Decode.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.stream.Stream; + +public class Base64Decode extends RecordPathSegment { + private final RecordPathSegment recordPath; + + public Base64Decode(final RecordPathSegment recordPath, final boolean absolute) { + super("base64Decode", null, absolute); + this.recordPath = recordPath; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + + Object value = fv.getValue(); + if (value instanceof String) { + return new StandardFieldValue(new String(Base64.getDecoder().decode(fv.getValue().toString()), StandardCharsets.UTF_8), fv.getField(), fv.getParent().orElse(null)); + } else if (value instanceof byte[]) { + return new StandardFieldValue(Base64.getDecoder().decode((byte[]) value), fv.getField(), fv.getParent().orElse(null)); + } else { + throw new IllegalArgumentException("Argument supplied to base64Decode must be a String or byte[]"); + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Encode.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Encode.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Encode.java new file mode 100644 index 0000000..69a878b --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Base64Encode.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; + +import java.io.UnsupportedEncodingException; +import java.util.Base64; +import java.util.stream.Stream; + +public class Base64Encode extends RecordPathSegment { + private final RecordPathSegment recordPath; + + public Base64Encode(final RecordPathSegment recordPath, final boolean absolute) { + super("base64Encode", null, absolute); + this.recordPath = recordPath; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + + Object value = fv.getValue(); + if (value instanceof String) { + try { + return new StandardFieldValue(Base64.getEncoder().encodeToString(value.toString().getBytes("UTF-8")), fv.getField(), fv.getParent().orElse(null)); + } catch (final UnsupportedEncodingException e) { + return null; // won't happen. + } + } else if (value instanceof byte[]) { + return new StandardFieldValue(Base64.getEncoder().encode((byte[]) value), fv.getField(), fv.getParent().orElse(null)); + } else { + throw new IllegalArgumentException("Argument supplied to base64Encode must be a String or byte[]"); + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index 9a2821a..a30419d 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -64,6 +64,8 @@ import org.apache.nifi.record.path.filter.NotEqualsFilter; import org.apache.nifi.record.path.filter.NotFilter; import org.apache.nifi.record.path.filter.RecordPathFilter; import org.apache.nifi.record.path.filter.StartsWith; +import org.apache.nifi.record.path.functions.Base64Decode; +import org.apache.nifi.record.path.functions.Base64Encode; import org.apache.nifi.record.path.functions.Concat; import org.apache.nifi.record.path.functions.Format; import org.apache.nifi.record.path.functions.FieldName; @@ -264,6 +266,14 @@ public class RecordPathCompiler { final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); return new Format(args[0], args[1], absolute); } + case "base64Encode": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); + return new Base64Encode(args[0], absolute); + } + case "base64Decode": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); + return new Base64Decode(args[0], absolute); + } default: { throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only " + "be used within a predicate, not as a standalone function"); http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index 89b0893..67c14e6 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -27,11 +27,14 @@ import java.sql.Date; import java.text.DateFormat; import java.text.ParseException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.nifi.record.path.exception.RecordPathException; import org.apache.nifi.serialization.SimpleRecordSchema; @@ -1279,6 +1282,73 @@ public class TestRecordPath { RecordPath.compile("toBytes(/s, \"NOT A REAL CHARSET\")").evaluate(record).getSelectedFields().findFirst().get().getValue(); } + @Test + public void testBase64Encode() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final List<Object> expectedValues = Arrays.asList( + Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8)), + Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)), + Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)) + ); + final Map<String, Object> values = new HashMap<>(); + values.put("firstName", "John"); + values.put("lastName", "Doe"); + values.put("b", "xyz".getBytes(StandardCharsets.UTF_8)); + final Record record = new MapRecord(schema, values); + + assertEquals(Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8)), + RecordPath.compile("base64Encode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); + assertEquals(Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)), + RecordPath.compile("base64Encode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); + assertTrue(Arrays.equals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)), + (byte[]) RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue())); + List<Object> actualValues = RecordPath.compile("base64Encode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList()); + IntStream.range(0, 3).forEach(i -> { + Object expectedObject = expectedValues.get(i); + Object actualObject = actualValues.get(i); + if (actualObject instanceof String) { + assertEquals(expectedObject, actualObject); + } else if (actualObject instanceof byte[]) { + assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject)); + } + }); + } + + @Test + public void testBase64Decode() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final List<Object> expectedValues = Arrays.asList("John", "Doe", "xyz".getBytes(StandardCharsets.UTF_8)); + final Map<String, Object> values = new HashMap<>(); + values.put("firstName", Base64.getEncoder().encodeToString("John".getBytes(StandardCharsets.UTF_8))); + values.put("lastName", Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8))); + values.put("b", Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8))); + final Record record = new MapRecord(schema, values); + + assertEquals("John", RecordPath.compile("base64Decode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); + assertEquals("Doe", RecordPath.compile("base64Decode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); + assertTrue(Arrays.equals("xyz".getBytes(StandardCharsets.UTF_8), (byte[]) RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue())); + List<Object> actualValues = RecordPath.compile("base64Decode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList()); + IntStream.range(0, 3).forEach(i -> { + Object expectedObject = expectedValues.get(i); + Object actualObject = actualValues.get(i); + if (actualObject instanceof String) { + assertEquals(expectedObject, actualObject); + } else if (actualObject instanceof byte[]) { + assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject)); + } + }); + } + private List<RecordField> getDefaultFields() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); http://git-wip-us.apache.org/repos/asf/nifi/blob/5a84d650/nifi-docs/src/main/asciidoc/record-path-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc index 8de98ee..14a2299 100644 --- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc +++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc @@ -627,6 +627,65 @@ The following record path expression would re-format the date String: | `format( toDate(/eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'"), 'yyyy-MM-dd')` | 2017-10-20 |========================================================== +=== base64Encode + +Converts a String or byte[] using Base64 encoding, using the UTF-8 character set. For example, given a schema such as: + +---- +{ + "type": "record", + "name": "events", + "fields": [ + { "name": "name", "type": "string" } + ] +} +---- + +and a record such as: + +---- +{ + "name" : "John" +} +---- + +The following record path expression would encode the String using Base64: + +|========================================================== +| RecordPath | Return value +| `base64Encode(/name)` | Sm9obg== +|========================================================== + +=== base64Decode + +Decodes a Base64-encoded String or byte[]. For example, given a schema such as: + +---- +{ + "type": "record", + "name": "events", + "fields": [ + { "name": "name", "type": "string" } + ] +} +---- + +and a record such as: + +---- +{ + "name" : "Sm9obg==" +} +---- + +The following record path expression would decode the String using Base64: + +|========================================================== +| RecordPath | Return value +| `base64Decode(/name)` | John +|========================================================== + + [[filter_functions]] == Filter Functions