briansolo1985 commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1065809117


##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig 
generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> 
operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for 
Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from 
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + 
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, 
objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {

Review Comment:
   This approach solves the other direction, and prevents to stuck in a forever 
waiting loop.
   The other direction is: ack wait loop times out and MiNiFi continues the 
process remainder operations. Meanwhile bootstrap acks back and starts 
processing the same list of operations. Even the previously omitted 
synchronized keyword wouldn't prevent this, just would delay the failure.
   This is highly unlikely to happen, but if this happens it will result in a 
very cryptic behavior. Maybe we should create another state variable 
`isAckTimedOut` besides `ackReceived` and fill in respectively. So in the 
`acknowledgeHandler` method we would check it's value, and if the operation is 
timed out we would simply log that the acknowledge has arrived, but we dropped 
it as it had already been timed out.
   Wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to