briansolo1985 commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1065809530


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig 
generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> 
operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for 
Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from 
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + 
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, 
objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap 
after {} seconds. Handling remaining operations.", 
MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping 
acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = 
requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            OperationState state = OperationState.valueOf(params[0]);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (state != OperationState.FULLY_APPLIED) {
+                handleOngoingOperations(optionalOperationQueue);

Review Comment:
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to