Lehel44 commented on code in PR #7007: URL: https://github.com/apache/nifi/pull/7007#discussion_r1131793273
########## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java: ########## @@ -458,23 +461,58 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun private ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError = createOnError(context, session, result, REL_FAILURE, REL_RETRY); - onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> { - switch (r.destination()) { + onFlowFileError = onFlowFileError.andThen((ctx, flowFile, errorTypesResult, exception) -> { + flowFile = addErrorAttributesToFlowFile(session, flowFile, exception); + + switch (errorTypesResult.destination()) { case Failure: - getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {i, e}, e); + getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, exception}, exception); break; case Retry: getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", - new Object[] {i, e}, e); + new Object[] {flowFile, exception}, exception); break; case Self: - getLogger().error("Failed to update database for {} due to {};", new Object[] {i, e}, e); + getLogger().error("Failed to update database for {} due to {};", new Object[] {flowFile, exception}, exception); break; } }); return RollbackOnFailure.createOnError(onFlowFileError); } + private ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { + ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError + = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); + onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { + + switch (errorTypesResult.destination()) { + case Failure: + List<FlowFile> flowFilesToFailure = getFlowFilesOnRelationShip(result, REL_FAILURE); + Optional.ofNullable(flowFilesToFailure).map(flowFiles -> + result.getRoutedFlowFiles().put(REL_FAILURE, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); + break; + case Retry: + List<FlowFile> flowFilesToRetry = getFlowFilesOnRelationShip(result, REL_RETRY); + Optional.ofNullable(flowFilesToRetry).map(flowFiles -> + result.getRoutedFlowFiles().put(REL_RETRY, addErrorAttributesToFlowFilesInGroup(session, flowFiles, flowFileGroup.getFlowFiles(), exception))); + break; + } + }); + return onGroupError; + } Review Comment: The RoutingResult::getRoutedFlowFiles already returns an empty map, so the getFlowFilesOnRelationShip does not need to use Optional and should return an empty list in case the flowfiles for the specified relationship are missing. This means that `flowFilesToFailure` variable in onGroupError cannot be null and also doesn't need to be wrapped in Optionals. I'd also recommend simplifying the 2-branched switch to an if statement for visibility and extracting the condition on getting the Relationship based on the ErrorTypesResult.Destination since both branches depend on that value i.e. -> the if statement is not needed. ```suggestion private ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { Relationship relationship = errorTypesResult.destination() == ErrorTypes.Destination.Failure ? REL_FAILURE : REL_RETRY; List<FlowFile> flowFilesToRelationship = getFlowFilesOnRelationship(result, relationship); result.getRoutedFlowFiles().put(relationship, addErrorAttributesToFlowFilesInGroup(session, flowFilesToRelationship, flowFileGroup.getFlowFiles(), exception)); }); return onGroupError; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org