briansolo1985 commented on code in PR #6733: URL: https://github.com/apache/nifi/pull/6733#discussion_r1065809117
########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java: ########## @@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) { } public void start() { - scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + } + + private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) { + LOGGER.info("Handling ongoing operations: {}", operationQueue); + if (operationQueue.isPresent()) { + try { + waitForAcknowledgeFromBootstrap(); + c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations()); + } catch (Exception e) { + LOGGER.error("Failed to process c2 operations queue", e); + c2ClientService.enableHeartbeat(); + } + } else { + c2ClientService.enableHeartbeat(); + } + } + + private void waitForAcknowledgeFromBootstrap() { + LOGGER.info("Waiting for ACK signal from Bootstrap"); + int currentWaitTime = 0; + while(!ackReceived) { + int sleep = 1000; + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted while waiting for Acknowledge"); + } + currentWaitTime += sleep; + if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) { + LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS); + break; + } + } + } + + private void registerOperation(C2Operation c2Operation) { + try { + ackReceived = false; + registerAcknowledgeTimeoutTask(); + String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : ""); + bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation)); + } catch (IOException e) { + LOGGER.error("Failed to send operation to bootstrap", e); + throw new UncheckedIOException(e); + } + } + + private void registerAcknowledgeTimeoutTask() { + bootstrapAcknowledgeExecutorService.schedule(() -> { Review Comment: This approach solves the other direction, and prevents to stuck in a forever waiting loop. The other direction is: ack wait loop times out and MiNiFi continues the process remainder operations. Meanwhile bootstrap acks back and starts processing the same list of operations. Even the previously omitted synchronized keyword wouldn't prevent this, just would delay the failure. This is highly unlikely to happen, but if this happens it will result in a very cryptic behavior. Maybe we should create another state variable `isAckTimedOut` besides `ackReceived` and fill in respectively. So in the `acknowledgeHandler` method we would check it's value, and if the operation is timed out we would simply log that the acknowledge has arrived, but we dropped it as it had already been timed out. Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org