Providing a SplitJson processor which will break JSON Arrays into their individual elements. Refactored supporting JsonUtils code and EvaluateJsonPath to reuse common functionality.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2e05dcbb Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2e05dcbb Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2e05dcbb Branch: refs/heads/NIFI-360 Commit: 2e05dcbbfdb37843456dff121d9878d8679d7067 Parents: 59ad194 Author: Aldrin Piri <aldrinp...@gmail.com> Authored: Tue Feb 17 15:15:53 2015 -0500 Committer: Aldrin Piri <aldrinp...@gmail.com> Committed: Tue Feb 17 15:17:12 2015 -0500 ---------------------------------------------------------------------- .../processors/standard/EvaluateJsonPath.java | 8 +- .../nifi/processors/standard/SplitJson.java | 140 +++++++++++++++++++ .../processors/standard/util/JsonUtils.java | 8 ++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestEvaluateJsonPath.java | 3 +- .../nifi/processors/standard/TestSplitJson.java | 115 +++++++++++++++ 6 files changed, 268 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index 0bdd20c..8cdf1a2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -192,7 +192,7 @@ public class EvaluateJsonPath extends AbstractProcessor { final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null); try { Object result = documentContext.read(jsonPathExp); - if (returnType.equals(RETURN_TYPE_SCALAR) && !isScalar(result)) { + if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) { logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.", new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()}); processSession.transfer(flowFile, REL_FAILURE); @@ -233,15 +233,11 @@ public class EvaluateJsonPath extends AbstractProcessor { } private static String getResultRepresentation(Object jsonPathResult) { - if (isScalar(jsonPathResult)) { + if (JsonUtils.isJsonScalar(jsonPathResult)) { return jsonPathResult.toString(); } return JsonUtils.JSON_PROVIDER.toJson(jsonPathResult); } - private static boolean isScalar(Object obj) { - return (obj instanceof String); - } - } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java new file mode 100644 index 0000000..e589b48 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processors.standard.util.JsonUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.*; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"json", "split", "jsonpath"}) +@CapabilityDescription("Splits a JSON File into multiple, separate FlowFiles for an array element specified by a JsonPath expression. " + + "Each generated FlowFile is comprised of an element of the specified array and transferred to relationship 'split, " + + "with the original file transferred to the 'original' relationship. If the specified JsonPath is not found or " + + "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.") +public class SplitJson extends AbstractProcessor { + + public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder() + .name("JsonPath Expression") + .description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.") + .required(true) + .addValidator(JsonUtils.JSON_PATH_VALIDATOR) + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build(); + public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All segments of the original FlowFile will be routed to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON or the specified path does not exist), it will be routed to this relationship").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(ARRAY_JSON_PATH_EXPRESSION); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_SPLIT); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) { + final FlowFile original = processSession.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + + + final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, original); + + if (documentContext == null) { + logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original}); + processSession.transfer(original, REL_FAILURE); + return; + } + + final String jsonPathExpression = processContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue(); + final JsonPath jsonPath = JsonPath.compile(jsonPathExpression); + + final List<FlowFile> segments = new ArrayList<>(); + + Object jsonPathResult = documentContext.read(jsonPath); + + if (!(jsonPathResult instanceof List)) { + logger.error("The evaluated value {} of {} was not an array compatible type and cannot be split.", + new Object[]{jsonPathResult, jsonPath.getPath()}); + processSession.transfer(original, REL_FAILURE); + return; + } + + List resultList = (List) jsonPathResult; + + for (final Object resultSegment : resultList) { + FlowFile split = processSession.create(original); + split = processSession.write(split, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + String resultSegmentContent; + if (JsonUtils.isJsonScalar(resultSegment)) { + resultSegmentContent = resultSegment.toString(); + } else { + resultSegmentContent = JsonUtils.JSON_PROVIDER.toJson(resultSegment); + } + out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); + } + }); + segments.add(split); + } + + processSession.transfer(segments, REL_SPLIT); + processSession.transfer(original, REL_ORIGINAL); + logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java index 6eb567e..0bf33dd 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JsonUtils.java @@ -35,6 +35,8 @@ import org.apache.nifi.util.ObjectHolder; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.List; +import java.util.Map; /** * Provides utilities for interacting with JSON elements and JsonPath expressions and results @@ -101,4 +103,10 @@ public class JsonUtils { return isValid; } + + public static boolean isJsonScalar(Object obj) { + // For the default provider, a Map or List is able to be handled as a JSON entity + return !(obj instanceof Map || obj instanceof List); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 8a1fd74..17b5364 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -55,6 +55,7 @@ org.apache.nifi.processors.standard.ScanAttribute org.apache.nifi.processors.standard.ScanContent org.apache.nifi.processors.standard.SegmentContent org.apache.nifi.processors.standard.SplitContent +org.apache.nifi.processors.standard.SplitJson org.apache.nifi.processors.standard.SplitText org.apache.nifi.processors.standard.SplitXml org.apache.nifi.processors.standard.TransformXml http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java index c873969..b7b5103 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.processor.Relationship; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -158,7 +159,7 @@ public class TestEvaluateJsonPath { testRunner.assertAllFlowFilesTransferred(expectedRel, 1); final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0); Assert.assertEquals("Transferred flow file did not have the correct result for id attribute", "54df94072d5dbf7dc6340cc5", out.getAttribute(jsonPathIdAttrKey)); - Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", "", out.getAttribute(jsonPathNameAttrKey)); + Assert.assertEquals("Transferred flow file did not have the correct result for name attribute", StringUtils.EMPTY, out.getAttribute(jsonPathNameAttrKey)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e05dcbb/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java new file mode 100644 index 0000000..dd6fc6d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Path; +import java.nio.file.Paths; + +public class TestSplitJson { + + private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json"); + private static final Path XML_SNIPPET = Paths.get("src/test/resources/TestXml/xml-snippet.xml"); + + @Test(expected = AssertionError.class) + public void testInvalidJsonPath() { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.."); + + Assert.fail("An improper JsonPath expression was not detected as being invalid."); + } + + @Test + public void testInvalidJsonDocument() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$"); + + testRunner.enqueue(XML_SNIPPET); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(SplitJson.REL_FAILURE, 1); + final MockFlowFile out = testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0); + // Verify that the content was unchanged + out.assertContentEquals(XML_SNIPPET); + } + + @Test + public void testSplit_nonArrayResult() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0]._id"); + + testRunner.enqueue(JSON_SNIPPET); + testRunner.run(); + + Relationship expectedRel = SplitJson.REL_FAILURE; + + testRunner.assertAllFlowFilesTransferred(expectedRel, 1); + final MockFlowFile out = testRunner.getFlowFilesForRelationship(expectedRel).get(0); + out.assertContentEquals(JSON_SNIPPET); + } + + @Test + public void testSplit_arrayResult_oneValue() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0].range[?(@ == 0)]"); + + testRunner.enqueue(JSON_SNIPPET); + testRunner.run(); + + testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1); + testRunner.assertTransferCount(SplitJson.REL_SPLIT, 1); + testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET); + testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("0"); + } + + @Test + public void testSplit_arrayResult_multipleValues() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[0].range"); + + testRunner.enqueue(JSON_SNIPPET); + testRunner.run(); + + int numSplitsExpected = 10; + + testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1); + testRunner.assertTransferCount(SplitJson.REL_SPLIT, numSplitsExpected); + final MockFlowFile originalOut = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0); + originalOut.assertContentEquals(JSON_SNIPPET); + } + + @Test + public void testSplit_arrayResult_nonScalarValues() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$[*].name"); + + testRunner.enqueue(JSON_SNIPPET); + testRunner.run(); + + testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1); + testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7); + testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET); + testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}"); + } + +}