Repository: nifi Updated Branches: refs/heads/master 9e884f612 -> fa5fed9bb
NIFI-3082: Fixed status code handling in PutElasticsearchHttp This closes #1258. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fa5fed9b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fa5fed9b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fa5fed9b Branch: refs/heads/master Commit: fa5fed9bb59eb485e48dd7350bf693a3039307ef Parents: 9e884f6 Author: Matt Burgess <[email protected]> Authored: Tue Nov 22 12:53:45 2016 -0500 Committer: Pierre Villard <[email protected]> Committed: Tue Nov 22 21:32:03 2016 +0100 ---------------------------------------------------------------------- .../processors/elasticsearch/PutElasticsearchHttp.java | 12 +++++++++--- .../elasticsearch/TestPutElasticsearchHttp.java | 11 ++++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fa5fed9b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 92b1452..7117100 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -358,9 +358,15 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { session.transfer(flowFilesToTransfer, REL_FAILURE); context.yield(); } - } else { - // Something went wrong during the bulk update, throw a ProcessException to indicate rollback - throw new ProcessException("Received error code " + statusCode + " from Elasticsearch API"); + } else if (statusCode / 100 == 5) { + // 5xx -> RETRY, but a server error might last a while, so yield + logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", + new Object[]{statusCode, getResponse.message()}); + session.transfer(flowFilesToTransfer, REL_RETRY); + context.yield(); + } else { // 1xx, 3xx, 4xx, etc. -> NO RETRY + logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()}); + session.transfer(flowFilesToTransfer, REL_FAILURE); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fa5fed9b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index c3d5a34..1172004 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -167,7 +167,7 @@ public class TestPutElasticsearchHttp { runner.assertNotValid(); } - @Test(expected = AssertionError.class) + @Test public void testPutElasticSearchOnTriggerWithFailures() throws IOException { PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true); processor.setStatus(100, "Should fail"); @@ -183,6 +183,15 @@ public class TestPutElasticsearchHttp { put("doc_id", "28039652140"); }}); runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + runner.clearTransferState(); + + processor.setStatus(500, "Should retry"); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_RETRY, 1); } @Test
