exceptionfactory commented on code in PR #11001:
URL: https://github.com/apache/nifi/pull/11001#discussion_r2931346425
##########
nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java:
##########
@@ -143,104 +149,155 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
+ @OnScheduled
+ @Override
+ public void setup(final ProcessContext context) {
+ super.setup(context);
+ final int maxStringLength =
context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+ final StreamReadConstraints streamReadConstraints =
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
+ jsonUtil = JsonUtils.customJsonUtil(objectMapper);
+
+ configuredClassLoader = getClass().getClassLoader();
+ try {
+ final JoltTransformStrategy strategy =
context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class);
+
+ if (strategy == JoltTransformStrategy.CUSTOMR &&
context.getProperty(MODULES).isSet()) {
+ configuredClassLoader = ClassLoaderUtils.getCustomClassLoader(
+
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
+ getClass().getClassLoader(),
+ getJarFilenameFilter()
+ );
+ }
+ } catch (final Exception e) {
+ getLogger().error("ClassLoader configuration failed", e);
+ }
+ }
+
@Override
public void onTrigger(final ProcessContext context, ProcessSession
session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
- final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
- final Object inputJson;
- final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE ==
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
- String jsonSourceAttributeName = null;
-
- if (sourceStrategyFlowFile) {
- try (final InputStream in = session.read(original)) {
- inputJson = jsonUtil.jsonToObject(in);
- } catch (final Exception e) {
- logger.error("JSON parsing failed on FlowFile content for {}",
original, e);
- session.transfer(original, REL_FAILURE);
- return;
- }
- } else {
- jsonSourceAttributeName =
context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
- final String jsonSourceAttributeValue =
original.getAttribute(jsonSourceAttributeName);
- if (StringUtils.isBlank(jsonSourceAttributeValue)) {
- logger.error("FlowFile attribute '{}' value is blank",
jsonSourceAttributeName);
- session.transfer(original, REL_FAILURE);
- return;
- } else {
- try {
- inputJson =
jsonUtil.jsonToObject(jsonSourceAttributeValue);
- } catch (final Exception e) {
- logger.error("JSON parsing failed on attribute '{}' of
FlowFile {}", jsonSourceAttributeName, original, e);
- session.transfer(original, REL_FAILURE);
- return;
- }
- }
- }
-
- final String jsonString;
- final ClassLoader originalContextClassLoader =
Thread.currentThread().getContextClassLoader();
+ final JsonSourceStrategy sourceStrategy =
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
+ final ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
try {
+
Thread.currentThread().setContextClassLoader(configuredClassLoader);
+
final JoltTransform transform = getTransform(context, original);
- if (customClassLoader != null) {
-
Thread.currentThread().setContextClassLoader(customClassLoader);
- }
+ final FlowFile transformedFlowFile = switch (sourceStrategy) {
+ case FLOW_FILE -> transformFlowFile(context, session,
original, transform);
+ case ATTRIBUTE -> transformAttribute(context, session,
original, transform);
+ case JSON_LINES -> transformNewlineDelimited(session,
original, transform);
+ };
- final Object transformedJson = TransformUtils.transform(transform,
inputJson);
- jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ?
jsonUtil.toPrettyJsonString(transformedJson) :
jsonUtil.toJsonString(transformedJson);
+ onSuccess(context, session, transformedFlowFile, sourceStrategy,
stopWatch);
} catch (final Exception e) {
- logger.error("Transform failed for {}", original, e);
+ getLogger().error("Failed to Transform {}", original, e);
session.transfer(original, REL_FAILURE);
- return;
} finally {
- if (customClassLoader != null && originalContextClassLoader !=
null) {
-
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
- }
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
}
+ }
- if (sourceStrategyFlowFile) {
- FlowFile transformed = session.write(original, out ->
out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
- final String transformType =
context.getProperty(JOLT_TRANSFORM).getValue();
- transformed = session.putAttribute(transformed,
CoreAttributes.MIME_TYPE.key(), "application/json");
- session.transfer(transformed, REL_SUCCESS);
- session.getProvenanceReporter().modifyContent(transformed,
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- logger.info("Transform completed on FlowFile content for {}",
original);
- } else {
- session.putAttribute(original, jsonSourceAttributeName,
jsonString);
- session.transfer(original, REL_SUCCESS);
- logger.info("Transform completed on attribute '{}' of FlowFile
{}", jsonSourceAttributeName, original);
+ private FlowFile transformFlowFile(
+ final ProcessContext context,
+ final ProcessSession session,
+ final FlowFile flowFile,
+ final JoltTransform transform
+ ) {
+ final Object inputJson;
+ try (final InputStream in = session.read(flowFile)) {
+ inputJson = jsonUtil.jsonToObject(in);
+ } catch (final Exception e) {
+ throw new ProcessException("JSON parsing failed for FlowFile", e);
}
+
+ final String transformedJson = getTransformedJson(context, transform,
inputJson);
+ return session.write(flowFile, out ->
out.write(transformedJson.getBytes(StandardCharsets.UTF_8)));
}
- @OnScheduled
- @Override
- public void setup(final ProcessContext context) {
- super.setup(context);
- final int maxStringLength =
context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
- final StreamReadConstraints streamReadConstraints =
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+ private FlowFile transformAttribute(
+ final ProcessContext context,
+ final ProcessSession session,
+ final FlowFile flowFile,
+ final JoltTransform transform
+ ) {
+ final String jsonSourceAttributeName =
context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
+ final String jsonSourceAttributeValue =
flowFile.getAttribute(jsonSourceAttributeName);
- final ObjectMapper objectMapper = new ObjectMapper();
-
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
- jsonUtil = JsonUtils.customJsonUtil(objectMapper);
+ if (jsonSourceAttributeValue == null ||
jsonSourceAttributeValue.isBlank()) {
+ throw new ProcessException("Content not found in FlowFile
Attribute [%s]".formatted(jsonSourceAttributeName));
+ }
+ final Object inputJson;
try {
- final JoltTransformStrategy strategy =
context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class);
+ inputJson = jsonUtil.jsonToObject(jsonSourceAttributeValue);
+ } catch (final Exception e) {
+ throw new ProcessException("JSON parsing failed for FlowFile
Attribute [%s]".formatted(jsonSourceAttributeName), e);
+ }
- if (strategy == JoltTransformStrategy.CUSTOMR &&
context.getProperty(MODULES).isSet()) {
- customClassLoader = ClassLoaderUtils.getCustomClassLoader(
-
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
- this.getClass().getClassLoader(),
- getJarFilenameFilter()
- );
- } else {
- customClassLoader = this.getClass().getClassLoader();
+ final String transformedJson = getTransformedJson(context, transform,
inputJson);
+ return session.putAttribute(flowFile, jsonSourceAttributeName,
transformedJson);
+ }
+
+ private FlowFile transformNewlineDelimited(
+ final ProcessSession session,
+ final FlowFile flowFile,
+ final JoltTransform transform
+ ) {
+ return session.write(flowFile, (in, out) -> {
+ try (
+ final BufferedReader reader = new BufferedReader(new
InputStreamReader(in, StandardCharsets.UTF_8));
+ final BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(out, StandardCharsets.UTF_8))
+ ) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.isBlank()) {
+ continue;
+ }
+ final Object inputJson = jsonUtil.jsonToObject(line);
+ final Object transformedJson =
TransformUtils.transform(transform, inputJson);
+ writer.write(jsonUtil.toJsonString(transformedJson));
+ writer.newLine();
Review Comment:
Good catch, although both should work in general, the single character is
correct, I will push a correction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]