JPercivall commented on a change in pull request #3299: NIFI-5172 Adding the
ability to specify a record writer for PutElasti…
URL: https://github.com/apache/nifi/pull/3299#discussion_r259962772
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
##########
@@ -450,47 +490,117 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
JsonNode itemNode = itemNodeArray.get(i);
int status =
itemNode.findPath("status").asInt();
if (!isSuccess(status)) {
- if (errorReason == null) {
+ if (errorReason == null || logAllErrors) {
// Use "result" if it is present; this
happens for status codes like 404 Not Found, which may not have an error/reason
String reason =
itemNode.findPath("result").asText();
if (StringUtils.isEmpty(reason)) {
// If there was no result, we
expect an error with a string description in the "reason" field
reason =
itemNode.findPath("reason").asText();
}
errorReason = reason;
- logger.error("Failed to process {} due
to {}, transferring to failure",
- new Object[]{flowFile,
errorReason});
+
+ logger.error("Failed to process record
{} in FlowFile {} due to {}, transferring to failure",
+ new Object[]{i, flowFile,
errorReason});
}
- failureCount++;
+ failures.add(i);
}
}
}
}
- flowFile = session.putAttribute(flowFile, "failure.count",
Integer.toString(failureCount));
- session.transfer(flowFile, REL_FAILURE);
} else {
+ // Everything succeeded, route FF and end
flowFile = session.putAttribute(flowFile, "record.count",
Integer.toString(recordCount));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile,
url.toString());
+ return;
}
} catch (IOException ioe) {
// Something went wrong when parsing the response, log the
error and route to failure
logger.error("Error parsing Bulk API response: {}", new
Object[]{ioe.getMessage()}, ioe);
session.transfer(flowFile, REL_FAILURE);
context.yield();
+ return;
+ } finally {
+ getResponse.close();
}
} else if (statusCode / 100 == 5) {
// 5xx -> RETRY, but a server error might last a while, so yield
logger.warn("Elasticsearch returned code {} with message {},
transferring flow file to retry. This is likely a server problem, yielding...",
new Object[]{statusCode, getResponse.message()});
session.transfer(flowFile, REL_RETRY);
context.yield();
+ return;
} else { // 1xx, 3xx, 4xx, etc. -> NO RETRY
logger.warn("Elasticsearch returned code {} with message {},
transferring flow file to failure", new Object[]{statusCode,
getResponse.message()});
session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ // If everything failed or we don't have a writer factory, route the
entire original FF to failure.
+ if ((!failures.isEmpty() && failures.size() == recordCount ) ||
!writerFactoryOptional.isPresent()) {
Review comment:
I waffled back and fother here because not only are these failures from ES
causing routing to "Failure" but also failures from earlier in the processor
(like if it fails to parse the FF using the reader). These other failures
aren't (or can't be) written as interpreted by the writer so need to be
transferred as is. Ultimately I figured the intent was to not rely on the
writer for transforming the data in this processor since it could end up here
for reasons outside of this ending logic. To address this and help mesh with
backwards compatibility, I ended up with the logic of just transferring it and
adding a note to the property description[1].
[1]
https://github.com/apache/nifi/pull/3299/files#diff-2be929537c0e0a6de16384818f0124d7R134
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services