bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1067429017


##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> 
c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
-            
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
+                
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> 
requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new 
LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {

Review Comment:
   ok, works for me I just thought if it is a queue and we treat it as a queue 
and we use it as a queue we should use queue :) but you are right list works as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to