Repository: nifi Updated Branches: refs/heads/0.x 8a18d2666 -> ceec0ecfd
NIFI-3517 If HandleHttpResponse cannot write response remove entry from HttpContextMap. * Add test for failure not clear context map. * Add handler to remove context map entry if ProcessException occurs during while exporting response. Signed-off-by: Mike Moser <mose...@apache.org> This closes #1567. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ceec0ecf Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ceec0ecf Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ceec0ecf Branch: refs/heads/0.x Commit: ceec0ecfd75d1c5387f98278b11278e497ca8428 Parents: 8a18d26 Author: Joe Skora <jsk...@gmail.com> Authored: Mon Mar 6 20:25:06 2017 -0500 Committer: Mike Moser <mose...@apache.org> Committed: Mon May 1 20:43:29 2017 +0000 ---------------------------------------------------------------------- .../processors/standard/HandleHttpResponse.java | 5 +++ .../standard/TestHandleHttpResponse.java | 41 +++++++++++++++++--- 2 files changed, 40 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ceec0ecf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java index e15abcc..2244ca6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java @@ -165,6 +165,11 @@ public class HandleHttpResponse extends AbstractProcessor { try { session.exportTo(flowFile, response.getOutputStream()); response.flushBuffer(); + } catch (final ProcessException e) { + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e}); + contextMap.complete(contextIdentifier); + return; } catch (final Exception e) { session.transfer(flowFile, REL_FAILURE); getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e}); http://git-wip-us.apache.org/repos/asf/nifi/blob/ceec0ecf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java index bdf1cdc..ba402d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java @@ -39,6 +39,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.http.HttpContextMap; import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.standard.util.HTTPUtils; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; @@ -56,7 +57,7 @@ public class TestHandleHttpResponse { public void testEnsureCompleted() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", false); + final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", ""); runner.addControllerService("http-context-map", contextMap); runner.enableControllerService(contextMap); runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); @@ -95,7 +96,7 @@ public class TestHandleHttpResponse { public void testWithExceptionThrown() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", true); + final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "FlowFileAccessException"); runner.addControllerService("http-context-map", contextMap); runner.enableControllerService(contextMap); runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); @@ -113,6 +114,32 @@ public class TestHandleHttpResponse { runner.run(); runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1); + assertEquals(0, contextMap.getCompletionCount()); + } + + @Test + public void testCannotWriteResponse() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); + + final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "ProcessException"); + runner.addControllerService("http-context-map", contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); + runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); + runner.setProperty("my-attr", "${my-attr}"); + runner.setProperty("no-valid-attr", "${no-valid-attr}"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id"); + attributes.put("my-attr", "hello"); + attributes.put("status.code", "201"); + + runner.enqueue("hello".getBytes(), attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1); + assertEquals(1, contextMap.getCompletionCount()); } private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap { @@ -121,14 +148,14 @@ public class TestHandleHttpResponse { private final AtomicInteger completedCount = new AtomicInteger(0); private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>(); - private final boolean shouldThrowException; + private final String shouldThrowExceptionClass; private volatile int statusCode = -1; private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>(); - public MockHttpContextMap(final String expectedIdentifier, final boolean shouldThrowException) { + public MockHttpContextMap(final String expectedIdentifier, final String shouldThrowExceptionClass) { this.id = expectedIdentifier; - this.shouldThrowException = shouldThrowException; + this.shouldThrowExceptionClass = shouldThrowExceptionClass; } @Override @@ -144,8 +171,10 @@ public class TestHandleHttpResponse { try { final HttpServletResponse response = Mockito.mock(HttpServletResponse.class); - if(shouldThrowException) { + if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("FlowFileAccessException")) { Mockito.when(response.getOutputStream()).thenThrow(new FlowFileAccessException("exception")); + } else if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("ProcessException")) { + Mockito.when(response.getOutputStream()).thenThrow(new ProcessException("exception")); } else { Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() { @Override