markap14 commented on code in PR #10730:
URL: https://github.com/apache/nifi/pull/10730#discussion_r2673123786


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java:
##########
@@ -440,6 +442,42 @@ public Future<Void> stop(final FlowEngine scheduler) {
         return stopCompleteFuture;
     }
 
+    @Override
+    public Future<Void> drainFlowFiles() {
+        requireStopped();
+        stateTransition.setCurrentState(ConnectorState.DRAINING);
+
+        try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(extensionManager, 
connectorDetails.getConnector().getClass(), getIdentifier())) {
+            final CompletableFuture<Void> future = 
connectorDetails.getConnector().drainFlowFiles(activeFlowContext);
+            final CompletableFuture<Void> stateUpdateFuture = 
future.whenComplete((result, failureCause) -> 
stateTransition.setCurrentState(ConnectorState.STOPPED));
+            return stateUpdateFuture;
+        }
+    }
+
+    @Override
+    public Future<Void> purgeFlowFiles(final String requestor) {
+        requireStopped();
+        stateTransition.setCurrentState(ConnectorState.PURGING);
+
+        final String dropRequestId = UUID.randomUUID().toString();
+        final DropFlowFileStatus status = 
activeFlowContext.getManagedProcessGroup().dropAllFlowFiles(dropRequestId, 
requestor);
+        final CompletableFuture<Void> future = status.getCompletionFuture();
+        final CompletableFuture<Void> stateUpdateFuture = 
future.whenComplete((result, failureCause) -> 
stateTransition.setCurrentState(ConnectorState.STOPPED));
+        return stateUpdateFuture;
+    }
+
+    private void requireStopped() {
+        final ConnectorState desiredState = getDesiredState();
+        if (desiredState != ConnectorState.STOPPED) {
+            throw new IllegalStateException("Cannot drain flow files for " + 
this + " because its desired state is currently " + desiredState + "; it must 
be STOPPED.");
+        }
+
+        final ConnectorState currentState = getCurrentState();
+        if (currentState != ConnectorState.STOPPED) {
+            throw new IllegalStateException("Cannot drain flow files for " + 
this + " because its current state is currently " + currentState + "; it must 
be STOPPED.");
+        }

Review Comment:
   Good catch @mcgilman I pushed a fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to