This is an automated email from the ASF dual-hosted git repository. mthomsen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 1ac833654b NIFI-12127 Allow Jackson's max string length to be configured on SplitJson and EvaluateJsonPath 1ac833654b is described below commit 1ac833654b826e1366353c964ee1e4023d512f13 Author: Bryan Bende <bbe...@apache.org> AuthorDate: Tue Sep 26 09:46:22 2023 -0400 NIFI-12127 Allow Jackson's max string length to be configured on SplitJson and EvaluateJsonPath This closes #7794 Signed-off-by: Mike Thomsen <mthom...@apache.org> --- .../standard/AbstractJsonPathProcessor.java | 51 +++++++++++++++------- .../nifi/processors/standard/EvaluateJsonPath.java | 11 ++++- .../apache/nifi/processors/standard/SplitJson.java | 44 +++++++++++-------- 3 files changed, 69 insertions(+), 37 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java index 81c7aa8ec6..aa72654b99 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java @@ -16,20 +16,14 @@ */ package org.apache.nifi.processors.standard; +import com.fasterxml.jackson.core.StreamReadConstraints; +import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.spi.json.JacksonJsonProvider; import com.jayway.jsonpath.spi.json.JsonProvider; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -38,8 +32,18 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StringUtils; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + /** * Provides common functionality used for processors interacting and manipulating JSON data via JsonPath. * @@ -49,10 +53,6 @@ import org.apache.nifi.util.StringUtils; */ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { - private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build(); - - private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider(); - static final Map<String, String> NULL_REPRESENTATION_MAP = new HashMap<>(); static final String EMPTY_STRING_OPTION = "empty string"; @@ -71,14 +71,33 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { .defaultValue(EMPTY_STRING_OPTION) .build(); - static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) { + public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder() + .name("max-string-length") + .displayName("Max String Length") + .description("The maximum allowed length of a string value when parsing the JSON document") + .required(true) + .defaultValue("20 MB") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + static Configuration createConfiguration(final int maxStringLength) { + final StreamReadConstraints streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build(); + + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints); + + final JsonProvider jsonProvider = new JacksonJsonProvider(objectMapper); + return Configuration.builder().jsonProvider(jsonProvider).build(); + } + + static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile, Configuration jsonPathConfiguration) { // Parse the document once into an associated context to support multiple path evaluations if specified final AtomicReference<DocumentContext> contextHolder = new AtomicReference<>(null); processSession.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) { - DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(bufferedInputStream); + DocumentContext ctx = JsonPath.using(jsonPathConfiguration).parse(bufferedInputStream); contextHolder.set(ctx); } catch (IllegalArgumentException iae) { // The JsonPath.parse() above first parses the json, then creates a context object from the parsed @@ -109,11 +128,11 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { return !(obj instanceof Map || obj instanceof List); } - static String getResultRepresentation(Object jsonPathResult, String defaultValue) { + static String getResultRepresentation(JsonProvider jsonProvider, Object jsonPathResult, String defaultValue) { if (isJsonScalar(jsonPathResult)) { return Objects.toString(jsonPathResult, defaultValue); } - return JSON_PROVIDER.toJson(jsonPathResult); + return jsonProvider.toJson(jsonPathResult); } abstract static class JsonPathValidator implements Validator { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index e72352c306..bc5871a72d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard; +import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.JsonPath; @@ -37,6 +38,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -147,6 +149,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { private volatile String returnType; private volatile String pathNotFound; private volatile String nullDefaultValue; + private volatile Configuration jsonPathConfiguration; @Override protected void init(final ProcessorInitializationContext context) { @@ -161,6 +164,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { props.add(RETURN_TYPE); props.add(PATH_NOT_FOUND); props.add(NULL_VALUE_DEFAULT_REPRESENTATION); + props.add(MAX_STRING_LENGTH); this.properties = Collections.unmodifiableList(props); } @@ -248,6 +252,9 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { } pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue(); nullDefaultValue = NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue()); + + final int maxStringLength = processContext.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue(); + jsonPathConfiguration = createConfiguration(maxStringLength); } @OnUnscheduled @@ -266,7 +273,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { DocumentContext documentContext; try { - documentContext = validateAndEstablishJsonContext(processSession, flowFile); + documentContext = validateAndEstablishJsonContext(processSession, flowFile, jsonPathConfiguration); } catch (InvalidJsonException e) { logger.error("FlowFile {} did not have valid JSON content.", flowFile); processSession.transfer(flowFile, REL_FAILURE); @@ -316,7 +323,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { } } - final String resultRepresentation = getResultRepresentation(result, nullDefaultValue); + final String resultRepresentation = getResultRepresentation(jsonPathConfiguration.jsonProvider(), result, nullDefaultValue); if (destinationIsAttribute) { jsonPathResults.put(jsonPathAttrKey, resultRepresentation); } else { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index cb3d349638..fbe799980d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -16,25 +16,18 @@ */ package org.apache.nifi.processors.standard; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; - +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.InvalidJsonException; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -46,16 +39,24 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.InvalidJsonException; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.PathNotFoundException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; @@ -112,12 +113,14 @@ public class SplitJson extends AbstractJsonPathProcessor { private final AtomicReference<JsonPath> JSON_PATH_REF = new AtomicReference<>(); private volatile String nullDefaultValue; + private volatile Configuration jsonPathConfiguration; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(ARRAY_JSON_PATH_EXPRESSION); properties.add(NULL_VALUE_DEFAULT_REPRESENTATION); + properties.add(MAX_STRING_LENGTH); this.properties = Collections.unmodifiableList(properties); final Set<Relationship> relationships = new HashSet<>(); @@ -168,6 +171,9 @@ public class SplitJson extends AbstractJsonPathProcessor { @OnScheduled public void onScheduled(ProcessContext processContext) { nullDefaultValue = NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue()); + + final int maxStringLength = processContext.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue(); + jsonPathConfiguration = createConfiguration(maxStringLength); } @Override @@ -181,7 +187,7 @@ public class SplitJson extends AbstractJsonPathProcessor { DocumentContext documentContext; try { - documentContext = validateAndEstablishJsonContext(processSession, original); + documentContext = validateAndEstablishJsonContext(processSession, original, jsonPathConfiguration); } catch (InvalidJsonException e) { logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original}); processSession.transfer(original, REL_FAILURE); @@ -216,7 +222,7 @@ public class SplitJson extends AbstractJsonPathProcessor { Object resultSegment = resultList.get(i); FlowFile split = processSession.create(original); split = processSession.write(split, (out) -> { - String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue); + String resultSegmentContent = getResultRepresentation(jsonPathConfiguration.jsonProvider(), resultSegment, nullDefaultValue); out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); } );