Repository: nifi Updated Branches: refs/heads/NIFI-730 5867193bc -> a2ae99f89
NIFI-730: Make cancel request actually cancel Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a2ae99f8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a2ae99f8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a2ae99f8 Branch: refs/heads/NIFI-730 Commit: a2ae99f89965a3fe1bd6591204bdb187a377ae1c Parents: 5867193 Author: Mark Payne <marka...@hotmail.com> Authored: Wed Oct 14 11:11:48 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Oct 14 11:11:48 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/controller/DropFlowFileRequest.java | 2 +- .../nifi/controller/StandardFlowFileQueue.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a2ae99f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java index 58695c2..7eea86a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java @@ -77,7 +77,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus { } @Override - public DropFlowFileState getState() { + public synchronized DropFlowFileState getState() { return state; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a2ae99f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 8085760..062c424 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -952,6 +952,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { QueueSize droppedSize; try { + if (dropRequest.getState() == DropFlowFileState.CANCELED) { + logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); + return; + } + droppedSize = drop(activeQueueRecords, requestor); logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize); } catch (final IOException ioe) { @@ -972,6 +977,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); + if (dropRequest.getState() == DropFlowFileState.CANCELED) { + logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); + return; + } + droppedSize = drop(swapQueue, requestor); } catch (final IOException ioe) { logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); @@ -995,6 +1005,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { List<FlowFileRecord> swappedIn = null; try { + if (dropRequest.getState() == DropFlowFileState.CANCELED) { + logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); + return; + } + swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); droppedSize = drop(swappedIn, requestor); } catch (final IOException ioe) {