briansolo1985 commented on code in PR #6733: URL: https://github.com/apache/nifi/pull/6733#discussion_r1065809530
########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java: ########## @@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) { } public void start() { - scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + } + + private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) { + LOGGER.info("Handling ongoing operations: {}", operationQueue); + if (operationQueue.isPresent()) { + try { + waitForAcknowledgeFromBootstrap(); + c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations()); + } catch (Exception e) { + LOGGER.error("Failed to process c2 operations queue", e); + c2ClientService.enableHeartbeat(); + } + } else { + c2ClientService.enableHeartbeat(); + } + } + + private void waitForAcknowledgeFromBootstrap() { + LOGGER.info("Waiting for ACK signal from Bootstrap"); + int currentWaitTime = 0; + while(!ackReceived) { + int sleep = 1000; + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted while waiting for Acknowledge"); + } + currentWaitTime += sleep; + if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) { + LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS); + break; + } + } + } + + private void registerOperation(C2Operation c2Operation) { + try { + ackReceived = false; + registerAcknowledgeTimeoutTask(); + String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : ""); + bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation)); + } catch (IOException e) { + LOGGER.error("Failed to send operation to bootstrap", e); + throw new UncheckedIOException(e); + } + } + + private void registerAcknowledgeTimeoutTask() { + bootstrapAcknowledgeExecutorService.schedule(() -> { + if (!ackReceived) { + LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + } + }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + private void acknowledgeHandler(String[] params) { + LOGGER.info("Received acknowledge message from bootstrap process"); + if (params.length < 1) { + LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation"); + return; + } + + Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get(); + ackReceived = true; + optionalOperationQueue.ifPresent(operationQueue -> { + C2Operation c2Operation = operationQueue.getCurrentOperation(); + C2OperationAck c2OperationAck = new C2OperationAck(); + c2OperationAck.setOperationId(c2Operation.getIdentifier()); + C2OperationState c2OperationState = new C2OperationState(); + OperationState state = OperationState.valueOf(params[0]); + c2OperationState.setState(state); + c2OperationAck.setOperationState(c2OperationState); + c2ClientService.sendAcknowledge(c2OperationAck); + if (state != OperationState.FULLY_APPLIED) { + handleOngoingOperations(optionalOperationQueue); Review Comment: Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org