This is an automated email from the ASF dual-hosted git repository.

bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 52e257e16c NIFI-13123 MiNiFi async C2 operation processing
52e257e16c is described below

commit 52e257e16c6d86cde377ba0063ac268fd9ac6129
Author: Ferenc Kis <briansolo1...@gmail.com>
AuthorDate: Thu May 2 16:03:24 2024 +0200

    NIFI-13123 MiNiFi async C2 operation processing
    
    Signed-off-by: Csaba Bejan <bejan.cs...@gmail.com>
    
    This closes #8738.
---
 .../org/apache/nifi/c2/client/C2ClientConfig.java  |  12 +
 .../nifi/c2/client/service/C2ClientService.java    | 149 -----------
 .../nifi/c2/client/service/C2HeartbeatFactory.java |  98 ++++----
 .../nifi/c2/client/service/C2HeartbeatManager.java |  79 ++++++
 .../nifi/c2/client/service/C2OperationManager.java | 204 ++++++++++++++++
 ...tionDAO.java => C2OperationRestartHandler.java} |  26 +-
 .../client/service/operation/OperationQueue.java   |  17 +-
 ...tedOperationDAO.java => OperationQueueDAO.java} |   2 +-
 .../c2/client/service/C2ClientServiceTest.java     | 271 ---------------------
 .../c2/client/service/C2HeartbeatManagerTest.java  | 141 +++++++++++
 .../c2/client/service/C2OperationManagerTest.java  | 248 +++++++++++++++++++
 .../nifi/minifi/commons/api/MiNiFiProperties.java  |   1 +
 .../c2/BootstrapC2OperationRestartHandler.java     | 120 +++++++++
 .../apache/nifi/minifi/c2/C2NifiClientService.java | 204 +++++-----------
 ...ionDAO.java => FileBasedOperationQueueDAO.java} |  13 +-
 .../c2/BootstrapC2OperationRestartHandlerTest.java | 170 +++++++++++++
 ...st.java => FileBasedOperationQueueDAOTest.java} |  11 +-
 .../nifi/minifi/bootstrap/BootstrapListener.java   |  17 +-
 .../nifi/bootstrap/BootstrapCommunicator.java      |   6 +-
 .../org/apache/nifi/bootstrap/CommandResult.java   |  30 +--
 20 files changed, 1139 insertions(+), 680 deletions(-)

diff --git 
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
 
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
index 3171280f99..d9cd3aa90f 100644
--- 
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
+++ 
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -57,6 +57,7 @@ public class C2ClientConfig {
     private final long keepAliveDuration;
     private final String c2RequestCompression;
     private final String c2AssetDirectory;
+    private final long bootstrapAcknowledgeTimeout;
 
     private C2ClientConfig(final Builder builder) {
         this.c2Url = builder.c2Url;
@@ -86,6 +87,7 @@ public class C2ClientConfig {
         this.keepAliveDuration = builder.keepAliveDuration;
         this.c2RequestCompression = builder.c2RequestCompression;
         this.c2AssetDirectory = builder.c2AssetDirectory;
+        this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout;
     }
 
     public String getC2Url() {
@@ -196,6 +198,10 @@ public class C2ClientConfig {
         return keepAliveDuration;
     }
 
+    public long getBootstrapAcknowledgeTimeout() {
+        return bootstrapAcknowledgeTimeout;
+    }
+
     /**
      * Builder for client configuration.
      */
@@ -231,6 +237,7 @@ public class C2ClientConfig {
         private long keepAliveDuration;
         private String c2RequestCompression;
         private String c2AssetDirectory;
+        private long bootstrapAcknowledgeTimeout;
 
         public Builder c2Url(String c2Url) {
             this.c2Url = c2Url;
@@ -377,6 +384,11 @@ public class C2ClientConfig {
             return this;
         }
 
+        public Builder bootstrapAcknowledgeTimeout(long 
bootstrapAcknowledgeTimeout) {
+            this.bootstrapAcknowledgeTimeout = bootstrapAcknowledgeTimeout;
+            return this;
+        }
+
         public C2ClientConfig build() {
             return new C2ClientConfig(this);
         }
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
deleted file mode 100644
index e8f0718cbd..0000000000
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.c2.client.service;
-
-import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Consumer;
-import org.apache.nifi.c2.client.api.C2Client;
-import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
-import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
-import org.apache.nifi.c2.client.service.operation.OperationQueue;
-import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
-import org.apache.nifi.c2.protocol.api.C2Heartbeat;
-import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
-import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
-import org.apache.nifi.c2.protocol.api.C2OperationState;
-import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class C2ClientService {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(C2ClientService.class);
-
-    private final C2Client client;
-    private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationHandlerProvider c2OperationHandlerProvider;
-    private final RequestedOperationDAO requestedOperationDAO;
-    private final Consumer<C2Operation> c2OperationRegister;
-    private volatile boolean heartbeatLocked = false;
-
-    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationHandlerProvider c2OperationHandlerProvider,
-        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> 
c2OperationRegister) {
-        this.client = client;
-        this.c2HeartbeatFactory = c2HeartbeatFactory;
-        this.c2OperationHandlerProvider = c2OperationHandlerProvider;
-        this.requestedOperationDAO = requestedOperationDAO;
-        this.c2OperationRegister = c2OperationRegister;
-    }
-
-    public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
-            if (heartbeatLocked) {
-                logger.debug("Heartbeats are locked, skipping sending for 
now");
-            } else {
-                try {
-                    C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
-                    
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
-                } catch (Exception e) {
-                    logger.error("Failed to send/process heartbeat:", e);
-                }
-            }
-    }
-
-    public void sendAcknowledge(C2OperationAck operationAck) {
-        try {
-            client.acknowledgeOperation(operationAck);
-        } catch (Exception e) {
-            logger.error("Failed to send acknowledge:", e);
-        }
-    }
-
-    public void enableHeartbeat() {
-        heartbeatLocked = false;
-    }
-
-    private void disableHeartbeat() {
-        heartbeatLocked = true;
-    }
-
-    public void handleRequestedOperations(List<C2Operation> 
requestedOperations) {
-        LinkedList<C2Operation> c2Operations = new 
LinkedList<>(requestedOperations);
-        C2Operation requestedOperation;
-        while ((requestedOperation = c2Operations.poll()) != null) {
-            Optional<C2OperationHandler> c2OperationHandler = 
c2OperationHandlerProvider.getHandlerForOperation(requestedOperation);
-            if (!c2OperationHandler.isPresent()) {
-                continue;
-            }
-            C2OperationHandler operationHandler = c2OperationHandler.get();
-            C2OperationAck c2OperationAck = 
operationHandler.handle(requestedOperation);
-            if (requiresRestart(operationHandler, c2OperationAck)) {
-                if (initiateRestart(c2Operations, requestedOperation)) {
-                    return;
-                }
-                C2OperationState c2OperationState = new C2OperationState();
-                c2OperationState.setState(OperationState.NOT_APPLIED);
-                c2OperationAck.setOperationState(c2OperationState);
-            }
-            sendAcknowledge(c2OperationAck);
-        }
-        enableHeartbeat();
-        requestedOperationDAO.cleanup();
-    }
-
-    private void processResponse(C2HeartbeatResponse response) {
-        List<C2Operation> requestedOperations = 
response.getRequestedOperations();
-        if (requestedOperations != null && !requestedOperations.isEmpty()) {
-            logger.info("Received {} operations from the C2 server", 
requestedOperations.size());
-            handleRequestedOperations(requestedOperations);
-        } else {
-            logger.trace("No operations received from the C2 server in the 
server. Nothing to do.");
-        }
-    }
-
-    private boolean requiresRestart(C2OperationHandler c2OperationHandler, 
C2OperationAck c2OperationAck) {
-        return c2OperationHandler.requiresRestart() && 
isOperationFullyApplied(c2OperationAck);
-    }
-
-    private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
-        return Optional.ofNullable(c2OperationAck)
-            .map(C2OperationAck::getOperationState)
-            .map(C2OperationState::getState)
-            .filter(FULLY_APPLIED::equals)
-            .isPresent();
-    }
-
-    private boolean initiateRestart(LinkedList<C2Operation> 
requestedOperations, C2Operation requestedOperation) {
-        try {
-            disableHeartbeat();
-            requestedOperationDAO.save(new OperationQueue(requestedOperation, 
requestedOperations));
-            c2OperationRegister.accept(requestedOperation);
-            return true;
-        } catch (Exception e) {
-            logger.error("Failed to initiate restart. Dropping operation and 
continue with remaining operations", e);
-            requestedOperationDAO.cleanup();
-        }
-        return false;
-    }
-
-}
-
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
index c939ab78ab..b5d9e9824e 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
@@ -17,6 +17,13 @@
 
 package org.apache.nifi.c2.client.service;
 
+import static java.net.NetworkInterface.getNetworkInterfaces;
+import static java.util.Collections.list;
+import static java.util.Comparator.comparing;
+import static java.util.Comparator.comparingInt;
+import static java.util.Map.entry;
+import static java.util.Objects.nonNull;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
 import java.io.File;
@@ -25,13 +32,12 @@ import java.lang.management.OperatingSystemMXBean;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
+import java.net.SocketException;
 import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashSet;
+import java.util.Comparator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.nifi.c2.client.C2ClientConfig;
 import org.apache.nifi.c2.client.PersistentUuidGenerator;
 import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
@@ -137,8 +143,7 @@ public class C2HeartbeatFactory {
     }
 
     private DeviceInfo generateDeviceInfo() {
-        // Populate DeviceInfo
-        final DeviceInfo deviceInfo = new DeviceInfo();
+        DeviceInfo deviceInfo = new DeviceInfo();
         deviceInfo.setNetworkInfo(generateNetworkInfo());
         
deviceInfo.setIdentifier(getDeviceIdentifier(deviceInfo.getNetworkInfo()));
         deviceInfo.setSystemInfo(generateSystemInfo());
@@ -146,67 +151,63 @@ public class C2HeartbeatFactory {
     }
 
     private NetworkInfo generateNetworkInfo() {
-        NetworkInfo networkInfo = new NetworkInfo();
         try {
-            // Determine all interfaces
-            final Enumeration<NetworkInterface> networkInterfaces = 
NetworkInterface.getNetworkInterfaces();
-
-            final Set<NetworkInterface> operationIfaces = new HashSet<>();
-
-            // Determine eligible interfaces
-            while (networkInterfaces.hasMoreElements()) {
-                final NetworkInterface networkInterface = 
networkInterfaces.nextElement();
-                if (!networkInterface.isLoopback() && networkInterface.isUp()) 
{
-                    operationIfaces.add(networkInterface);
-                }
+            Set<NetworkInterface> eligibleInterfaces = 
list(getNetworkInterfaces())
+                .stream()
+                .filter(this::isEligibleInterface)
+                .collect(toSet());
+
+            if (logger.isTraceEnabled()) {
+                logger.trace("Found {} eligible interfaces with names {}", 
eligibleInterfaces.size(),
+                    eligibleInterfaces.stream()
+                        .map(NetworkInterface::getName)
+                        .collect(toSet())
+                );
             }
-            logger.trace("Have {} interfaces with names {}", 
operationIfaces.size(),
-                operationIfaces.stream()
-                    .map(NetworkInterface::getName)
-                    .collect(Collectors.toSet())
-            );
-
-            if (!operationIfaces.isEmpty()) {
-                if (operationIfaces.size() > 1) {
-                    logger.debug("Instance has multiple interfaces.  Generated 
information may be non-deterministic.");
-                }
 
-                boolean networkInfoUnset = true;
-                for (NetworkInterface networkInterface : operationIfaces) {
-                    Enumeration<InetAddress> inetAddresses = 
networkInterface.getInetAddresses();
-                    while (inetAddresses.hasMoreElements()) {
-                        InetAddress inetAddress = inetAddresses.nextElement();
-                        // IPv4 address is preferred over IPv6 as it provides 
more readable information for the user
-                        if (inetAddress instanceof Inet4Address) {
-                            updateNetworkInfo(networkInfo, networkInterface, 
inetAddress);
-                            return networkInfo;
-                        }
-                        if (networkInfoUnset) {
-                            updateNetworkInfo(networkInfo, networkInterface, 
inetAddress);
-                            networkInfoUnset = false;
-                        }
-                    }
-                }
-            }
+            Comparator<Map.Entry<NetworkInterface, InetAddress>> 
orderByIp4AddressesFirst = comparingInt(item -> item.getValue() instanceof 
Inet4Address ? 0 : 1);
+            Comparator<Map.Entry<NetworkInterface, InetAddress>> 
orderByNetworkInterfaceName = comparing(entry -> entry.getKey().getName());
+            return eligibleInterfaces.stream()
+                .flatMap(networkInterface -> 
list(networkInterface.getInetAddresses())
+                    .stream()
+                    .map(inetAddress -> entry(networkInterface, inetAddress)))
+                
.sorted(orderByIp4AddressesFirst.thenComparing(orderByNetworkInterfaceName))
+                .findFirst()
+                .map(entry -> createNetworkInfo(entry.getKey(), 
entry.getValue()))
+                .orElseGet(NetworkInfo::new);
         } catch (Exception e) {
             logger.error("Network Interface processing failed", e);
+            return new NetworkInfo();
         }
-        return networkInfo;
     }
 
-    private void updateNetworkInfo(NetworkInfo networkInfo, NetworkInterface 
networkInterface, InetAddress inetAddress) {
+    private boolean isEligibleInterface(NetworkInterface networkInterface) {
+        try {
+            return !networkInterface.isLoopback()
+                && !networkInterface.isVirtual()
+                && networkInterface.isUp()
+                && nonNull(networkInterface.getHardwareAddress());
+        } catch (SocketException e) {
+            logger.warn("Error processing network interface", e);
+            return false;
+        }
+    }
+
+    private NetworkInfo createNetworkInfo(NetworkInterface networkInterface, 
InetAddress inetAddress) {
+        NetworkInfo networkInfo = new NetworkInfo();
         networkInfo.setDeviceId(networkInterface.getName());
         networkInfo.setHostname(inetAddress.getHostName());
         networkInfo.setIpAddress(inetAddress.getHostAddress());
+        return networkInfo;
     }
 
     private String getDeviceIdentifier(NetworkInfo networkInfo) {
         if (deviceId == null) {
             if (networkInfo.getDeviceId() != null) {
                 try {
-                    final NetworkInterface netInterface = 
NetworkInterface.getByName(networkInfo.getDeviceId());
+                    NetworkInterface netInterface = 
NetworkInterface.getByName(networkInfo.getDeviceId());
                     byte[] hardwareAddress = netInterface.getHardwareAddress();
-                    final StringBuilder macBuilder = new StringBuilder();
+                    StringBuilder macBuilder = new StringBuilder();
                     if (hardwareAddress != null) {
                         for (byte address : hardwareAddress) {
                             macBuilder.append(String.format("%02X", address));
@@ -221,7 +222,6 @@ public class C2HeartbeatFactory {
                 deviceId = getConfiguredDeviceId();
             }
         }
-
         return deviceId;
     }
 
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java
new file mode 100644
index 0000000000..a3729255ce
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Optional.ofNullable;
+import static java.util.function.Predicate.not;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class C2HeartbeatManager implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(C2HeartbeatManager.class);
+
+    private final C2Client client;
+    private final C2HeartbeatFactory c2HeartbeatFactory;
+    private final ReentrantLock heartbeatLock;
+    private final RuntimeInfoWrapper runtimeInfoWrapper;
+    private final C2OperationManager c2OperationManager;
+
+    public C2HeartbeatManager(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, ReentrantLock heartbeatLock, RuntimeInfoWrapper 
runtimeInfoWrapper,
+                              C2OperationManager c2OperationManager) {
+        this.client = client;
+        this.c2HeartbeatFactory = c2HeartbeatFactory;
+        this.heartbeatLock = heartbeatLock;
+        this.runtimeInfoWrapper = runtimeInfoWrapper;
+        this.c2OperationManager = c2OperationManager;
+    }
+
+    @Override
+    public void run() {
+        if (!heartbeatLock.tryLock()) {
+            LOGGER.debug("Heartbeat lock is hold by another thread, skipping 
heartbeat sending");
+            return;
+        }
+        try {
+            LOGGER.debug("Heartbeat lock is acquired, sending heartbeat");
+            C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
+            
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+        } catch (Exception e) {
+            LOGGER.error("Failed to send/process heartbeat", e);
+        } finally {
+            heartbeatLock.unlock();
+            LOGGER.debug("Heartbeat unlocked lock and heartbeat is sent 
successfully");
+        }
+    }
+
+    private void processResponse(C2HeartbeatResponse response) {
+        ofNullable(response.getRequestedOperations())
+            .filter(not(List::isEmpty))
+            .ifPresentOrElse(operations -> {
+                    LOGGER.info("Received {} operations from the C2 server", 
operations.size());
+                    operations.forEach(c2OperationManager::add);
+                },
+                () -> LOGGER.debug("No operations received from the C2 server")
+            );
+    }
+}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java
new file mode 100644
index 0000000000..85d9d2d9a0
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Optional.of;
+import static java.util.Optional.ofNullable;
+import static java.util.function.Predicate.not;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
+import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class C2OperationManager implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(C2OperationManager.class);
+
+    private final C2Client client;
+    private final C2OperationHandlerProvider c2OperationHandlerProvider;
+    private final ReentrantLock heartbeatLock;
+    private final OperationQueueDAO operationQueueDAO;
+    private final C2OperationRestartHandler c2OperationRestartHandler;
+    private final BlockingQueue<C2Operation> c2Operations;
+
+    public C2OperationManager(C2Client client, C2OperationHandlerProvider 
c2OperationHandlerProvider, ReentrantLock heartbeatLock,
+                              OperationQueueDAO operationQueueDAO, 
C2OperationRestartHandler c2OperationRestartHandler) {
+        this.client = client;
+        this.c2OperationHandlerProvider = c2OperationHandlerProvider;
+        this.heartbeatLock = heartbeatLock;
+        this.operationQueueDAO = operationQueueDAO;
+        this.c2OperationRestartHandler = c2OperationRestartHandler;
+        this.c2Operations = new LinkedBlockingQueue<>();
+    }
+
+    public void add(C2Operation c2Operation) {
+        try {
+            c2Operations.put(c2Operation);
+        } catch (InterruptedException e) {
+            LOGGER.warn("Thread was interrupted", e);
+        }
+    }
+
+    @Override
+    public void run() {
+        processRestartState();
+
+        while (true) {
+            C2Operation operation;
+            try {
+                operation = c2Operations.take();
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread was interrupted", e);
+                return;
+            }
+
+            LOGGER.debug("Processing operation {}", operation);
+            C2OperationHandler operationHandler = 
c2OperationHandlerProvider.getHandlerForOperation(operation).orElse(null);
+            if (operationHandler == null) {
+                LOGGER.debug("No handler is present for C2 Operation {}, 
available handlers {}", operation, c2OperationHandlerProvider.getHandlers());
+                continue;
+            }
+
+            C2OperationAck c2OperationAck = operationHandler.handle(operation);
+            if (!requiresRestart(operationHandler, c2OperationAck)) {
+                LOGGER.debug("No restart is required. Sending ACK to C2 server 
{}", c2OperationAck);
+                sendAcknowledge(c2OperationAck);
+                continue;
+            }
+
+            heartbeatLock.lock();
+            LOGGER.debug("Restart is required. Heartbeats are stopped until 
restart is completed");
+            Optional<C2OperationState> restartState = initRestart(operation);
+            if (!restartState.isPresent()) {
+                LOGGER.debug("Restart in progress, stopping 
C2OperationManager");
+                break;
+            }
+
+            try {
+                C2OperationState failedState = restartState.get();
+                LOGGER.debug("Restart handler returned with a failed state 
{}", failedState);
+                c2OperationAck.setOperationState(failedState);
+                sendAcknowledge(c2OperationAck);
+            } finally {
+                operationQueueDAO.cleanup();
+                LOGGER.debug("Heartbeats are enabled again");
+                heartbeatLock.unlock();
+            }
+        }
+    }
+
+    private void processRestartState() {
+        Optional<OperationQueue> operationQueue = operationQueueDAO.load();
+
+        operationQueue.map(OperationQueue::getRemainingOperations)
+            .filter(not(List::isEmpty))
+            .ifPresent(this::processRemainingOperations);
+
+        operationQueue.map(OperationQueue::getCurrentOperation)
+            .ifPresentOrElse(this::processCurrentOperation,
+                () -> LOGGER.debug("No operation to acknowledge to C2 
server"));
+
+        operationQueue.ifPresent(__ -> operationQueueDAO.cleanup());
+    }
+
+    private void processRemainingOperations(List<C2Operation> 
remainingOperations) {
+        LOGGER.debug("Found remaining operations operations after restart. 
Heartbeats are stopped until processing is completed");
+        heartbeatLock.lock();
+        try {
+            List<C2Operation> mergedOperations = new LinkedList<>();
+            mergedOperations.addAll(remainingOperations);
+            mergedOperations.addAll(c2Operations);
+            c2Operations.clear();
+            mergedOperations.forEach(c2Operations::add);
+        } catch (Exception e) {
+            LOGGER.warn("Unable to recover operations from operation queue", 
e);
+        } finally {
+            heartbeatLock.unlock();
+            LOGGER.debug("Heartbeat lock released");
+        }
+    }
+
+    private void processCurrentOperation(C2Operation operation) {
+        LOGGER.debug("Found operation {} to acknowledge to C2 server", 
operation);
+
+        C2OperationState c2OperationState = 
c2OperationRestartHandler.waitForResponse()
+            .map(this::c2OperationState)
+            .orElse(c2OperationState(NOT_APPLIED));
+
+        C2OperationAck c2OperationAck = new C2OperationAck();
+        c2OperationAck.setOperationId(operation.getIdentifier());
+        c2OperationAck.setOperationState(c2OperationState);
+
+        sendAcknowledge(c2OperationAck);
+    }
+
+    private Optional<C2OperationState> initRestart(C2Operation operation) {
+        try {
+            LOGGER.debug("Restart initiated");
+            OperationQueue operationQueue = OperationQueue.create(operation, 
c2Operations);
+            operationQueueDAO.save(operationQueue);
+            return 
c2OperationRestartHandler.handleRestart(operation).map(this::c2OperationState);
+        } catch (Exception e) {
+            LOGGER.error("Failed to initiate restart. Dropping operation and 
continue with remaining operations", e);
+            return of(c2OperationState(NOT_APPLIED));
+        }
+    }
+
+    private C2OperationState c2OperationState(OperationState operationState) {
+        C2OperationState c2OperationState = new C2OperationState();
+        c2OperationState.setState(operationState);
+        return c2OperationState;
+    }
+
+    private void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            LOGGER.error("Failed to send acknowledge", e);
+        }
+    }
+
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, 
C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart() && 
isOperationFullyApplied(c2OperationAck);
+    }
+
+    private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
+        return ofNullable(c2OperationAck)
+            .map(C2OperationAck::getOperationState)
+            .map(C2OperationState::getState)
+            .filter(FULLY_APPLIED::equals)
+            .isPresent();
+    }
+}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java
similarity index 61%
copy from 
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
copy to 
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java
index 1216aa812d..c0b733946e 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java
@@ -18,28 +18,12 @@
 package org.apache.nifi.c2.client.service.operation;
 
 import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
 
-/**
- * The purpose of this interface is to be able to persist operations between 
restarts.
- */
-public interface RequestedOperationDAO {
-
-    /**
-     * Persist the given requested operation list
-     * @param operationQueue the queue containing the current and remaining 
operations
-     */
-    void save(OperationQueue operationQueue);
-
-    /**
-     * Returns the saved Operations
-     *
-     * @return the C2 Operations queue with the actual operation
-     */
-    Optional<OperationQueue> load();
+public interface C2OperationRestartHandler {
 
-    /**
-     * Resets the saved operations
-     */
-    void cleanup();
+    Optional<C2OperationState.OperationState> handleRestart(C2Operation 
c2Operation);
 
+    Optional<C2OperationState.OperationState> waitForResponse();
 }
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
index ae0c56c1a8..1f3b3ecb02 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
@@ -17,10 +17,12 @@
 
 package org.apache.nifi.c2.client.service.operation;
 
+import static java.util.Optional.ofNullable;
+
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Queue;
 import org.apache.nifi.c2.protocol.api.C2Operation;
 
 public class OperationQueue implements Serializable {
@@ -29,12 +31,23 @@ public class OperationQueue implements Serializable {
     private C2Operation currentOperation;
     private List<C2Operation> remainingOperations;
 
+    public static OperationQueue create(C2Operation currentOperation, 
Queue<C2Operation> remainingOperations) {
+        return new OperationQueue(
+            currentOperation,
+            ofNullable(remainingOperations)
+                .map(queue -> queue.stream().toList())
+                .orElseGet(List::of)
+        );
+
+    }
+
     public OperationQueue() {
     }
 
     public OperationQueue(C2Operation currentOperation, List<C2Operation> 
remainingOperations) {
         this.currentOperation = currentOperation;
-        this.remainingOperations = remainingOperations == null ? 
Collections.emptyList() : remainingOperations;
+        this.remainingOperations = remainingOperations;
+
     }
 
     public C2Operation getCurrentOperation() {
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java
similarity index 97%
copy from 
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
copy to 
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java
index 1216aa812d..ec3ee18230 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java
@@ -22,7 +22,7 @@ import java.util.Optional;
 /**
  * The purpose of this interface is to be able to persist operations between 
restarts.
  */
-public interface RequestedOperationDAO {
+public interface OperationQueueDAO {
 
     /**
      * Persist the given requested operation list
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
deleted file mode 100644
index 6de1f86015..0000000000
--- 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.c2.client.service;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.nifi.c2.client.api.C2Client;
-import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
-import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
-import org.apache.nifi.c2.client.service.operation.OperationQueue;
-import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
-import org.apache.nifi.c2.protocol.api.C2Heartbeat;
-import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
-import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
-import org.apache.nifi.c2.protocol.api.C2OperationState;
-import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-public class C2ClientServiceTest {
-
-    @Mock
-    private C2Client client;
-
-    @Mock
-    private C2HeartbeatFactory c2HeartbeatFactory;
-
-    @Mock
-    private C2OperationHandlerProvider operationService;
-
-    @Mock
-    private RuntimeInfoWrapper runtimeInfoWrapper;
-
-    @Mock
-    private RequestedOperationDAO requestedOperationDAO;
-
-    @Mock
-    private Consumer<C2Operation> c2OperationRegister;
-
-    @InjectMocks
-    private C2ClientService c2ClientService;
-
-    @Test
-    void testSendHeartbeatAndAckWhenOperationPresent() {
-        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
-        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
-        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
-        final List<C2Operation> c2Operations = generateOperation(1);
-        hbResponse.setRequestedOperations(c2Operations);
-        
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
-        
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
-        when(c2OperationHandler.handle(c2Operations.get(0))).thenReturn(new 
C2OperationAck());
-
-        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
-        verify(c2HeartbeatFactory).create(any());
-        verify(client).publishHeartbeat(heartbeat);
-        verify(c2OperationHandler).handle(any());
-        verify(client).acknowledgeOperation(any());
-    }
-
-    @Test
-    void testSendHeartbeatAndAckForMultipleOperationPresent() {
-        int operationNum = 5;
-        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
-        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
-        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
-        hbResponse.setRequestedOperations(generateOperation(operationNum));
-        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
-        
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-        
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
-        when(c2OperationHandler.handle(any())).thenReturn(new 
C2OperationAck());
-
-        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
-        verify(c2HeartbeatFactory).create(any());
-        verify(client).publishHeartbeat(heartbeat);
-        verify(c2OperationHandler, times(operationNum)).handle(any());
-        verify(client, times(operationNum)).acknowledgeOperation(any());
-    }
-
-    @Test
-    void testSendHeartbeatHandlesNoHeartbeatResponse() {
-        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
-        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
-        when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.empty());
-
-        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
-        verify(c2HeartbeatFactory).create(any());
-        verify(client).publishHeartbeat(heartbeat);
-        verify(client, times(0)).acknowledgeOperation(any());
-    }
-
-    @Test
-    void testSendHeartbeatNotHandledWhenThereAreNoOperationsSent() {
-        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
-        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
-        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
-        
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-
-        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
-        verify(c2HeartbeatFactory).create(any());
-        verify(client).publishHeartbeat(heartbeat);
-        verify(client, times(0)).acknowledgeOperation(any());
-    }
-
-    @Test
-    void testSendHeartbeatNotAckWhenOperationAckMissing() {
-        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
-        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
-        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
-        hbResponse.setRequestedOperations(generateOperation(1));
-        
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-        
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.empty());
-
-        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
-        verify(c2HeartbeatFactory).create(any());
-        verify(client).publishHeartbeat(heartbeat);
-        verify(client, times(0)).acknowledgeOperation(any());
-    }
-
-    @Test
-    void shouldHeartbeatSendingNotPropagateExceptions() {
-        when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenThrow(new 
RuntimeException());
-
-        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-    }
-
-    @Test
-    void shouldAckSendingNotPropagateExceptions() {
-        C2OperationAck operationAck = mock(C2OperationAck.class);
-        doThrow(new 
RuntimeException()).when(client).acknowledgeOperation(operationAck);
-
-        c2ClientService.sendAcknowledge(operationAck);
-    }
-
-    @Test
-    void 
shouldSendAcknowledgeWithoutPersistingOperationsWhenOperationRequiresRestartButHandlerReturnsNonFullyAppliedState()
 {
-        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
-        C2OperationAck operationAck = new C2OperationAck();
-        C2OperationState c2OperationState = new C2OperationState();
-        c2OperationState.setState(OperationState.NOT_APPLIED);
-        operationAck.setOperationState(c2OperationState);
-        when(c2OperationHandler.requiresRestart()).thenReturn(true);
-        
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
-        
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(operationAck);
-
-        c2ClientService.handleRequestedOperations(generateOperation(1));
-
-        
verify(operationService).getHandlerForOperation(any(C2Operation.class));
-        verify(c2OperationHandler).handle(any(C2Operation.class));
-        verify(requestedOperationDAO).cleanup();
-        verify(client).acknowledgeOperation(operationAck);
-        verifyNoMoreInteractions(operationService, client, 
requestedOperationDAO);
-        verifyNoInteractions(c2HeartbeatFactory, c2OperationRegister);
-    }
-
-    @Test
-    void 
shouldSaveOperationQueueIfRestartIsNeededAndThereAreMultipleRequestedOperations()
 {
-        C2Operation c2Operation1 = new C2Operation();
-        c2Operation1.setIdentifier("1");
-        C2Operation c2Operation2 = new C2Operation();
-        c2Operation2.setIdentifier("2");
-        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
-        when(c2OperationHandler.requiresRestart()).thenReturn(true);
-        
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
-        C2OperationAck c2OperationAck = new C2OperationAck();
-        C2OperationState c2OperationState = new C2OperationState();
-        c2OperationState.setState(OperationState.FULLY_APPLIED);
-        c2OperationAck.setOperationState(c2OperationState);
-        
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
-
-        c2ClientService.handleRequestedOperations(Arrays.asList(c2Operation1, 
c2Operation2));
-
-        verify(requestedOperationDAO).save(new OperationQueue(c2Operation1, 
Collections.singletonList(c2Operation2)));
-        verify(c2OperationRegister).accept(c2Operation1);
-        verifyNoInteractions(client, c2HeartbeatFactory);
-        verifyNoMoreInteractions(requestedOperationDAO, c2OperationRegister, 
operationService);
-    }
-
-    @Test
-    void 
shouldReEnableHeartbeatsIfExceptionHappensDuringRegisteringOperationAndThereIsNoMoreOperationInQueue()
 {
-        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
-        C2Operation operation = new C2Operation();
-        when(c2OperationHandler.requiresRestart()).thenReturn(true);
-        
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
-        C2OperationAck c2OperationAck = new C2OperationAck();
-        C2OperationState c2OperationState = new C2OperationState();
-        c2OperationState.setState(OperationState.FULLY_APPLIED);
-        c2OperationAck.setOperationState(c2OperationState);
-        
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
-        doThrow(new 
RuntimeException()).when(c2OperationRegister).accept(any(C2Operation.class));
-        
c2ClientService.handleRequestedOperations(Collections.singletonList(operation));
-        when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenReturn(new 
C2Heartbeat());
-
-        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
-        verify(c2HeartbeatFactory).create(runtimeInfoWrapper);
-        verify(client).publishHeartbeat(any(C2Heartbeat.class));
-    }
-
-    @Test
-    void 
shouldContinueWithRemainingOperationsIfExceptionHappensDuringRegisteringOperationAndThereAreMoreOperationsInQueue()
 {
-        C2OperationHandler c2OperationHandlerForRestart = 
mock(C2OperationHandler.class);
-        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
-        C2Operation operation1 = new C2Operation();
-        operation1.setIdentifier("1");
-        C2Operation operation2 = new C2Operation();
-        operation2.setIdentifier("2");
-        C2OperationAck c2OperationAck = new C2OperationAck();
-        C2OperationState c2OperationState = new C2OperationState();
-        c2OperationState.setState(OperationState.FULLY_APPLIED);
-        c2OperationAck.setOperationState(c2OperationState);
-        when(c2OperationHandler.requiresRestart()).thenReturn(false);
-        when(c2OperationHandlerForRestart.requiresRestart()).thenReturn(true);
-        
when(operationService.getHandlerForOperation(operation1)).thenReturn(Optional.of(c2OperationHandlerForRestart));
-        
when(operationService.getHandlerForOperation(operation2)).thenReturn(Optional.of(c2OperationHandler));
-        
when(c2OperationHandlerForRestart.handle(operation1)).thenReturn(c2OperationAck);
-        when(c2OperationHandler.handle(operation2)).thenReturn(c2OperationAck);
-
-        doThrow(new 
RuntimeException()).when(c2OperationRegister).accept(operation1);
-
-        c2ClientService.handleRequestedOperations(Arrays.asList(operation1, 
operation2));
-
-        verify(client, times(2)).acknowledgeOperation(c2OperationAck);
-    }
-
-    private List<C2Operation> generateOperation(int num) {
-        return IntStream.range(0, num)
-            .mapToObj(x -> new C2Operation())
-            .collect(Collectors.toList());
-    }
-}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java
new file mode 100644
index 0000000000..2de1483487
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2HeartbeatManagerTest {
+
+    @Mock
+    private C2Client mockC2Client;
+
+    @Mock
+    private C2HeartbeatFactory mockC2HeartbeatFactory;
+
+    @Mock
+    private ReentrantLock mockHeartbeatLock;
+
+    @Mock
+    private RuntimeInfoWrapper mockRuntimeInfoWrapper;
+
+    @Mock
+    private C2OperationManager mockC2OperationManager;
+
+    @InjectMocks
+    private C2HeartbeatManager testHeartbeatManager;
+
+    @Test
+    void shouldSkipSendingHeartbeatIfHeartbeatLockIsAcquired() {
+        when(mockHeartbeatLock.tryLock()).thenReturn(false);
+
+        testHeartbeatManager.run();
+
+        verify(mockC2HeartbeatFactory, never()).create(any());
+        verify(mockC2Client, never()).publishHeartbeat(any());
+        verify(mockC2OperationManager, never()).add(any());
+        verify(mockHeartbeatLock, never()).unlock();
+    }
+
+    @Test
+    void shouldSendHeartbeatAndProcessEmptyResponse() {
+        when(mockHeartbeatLock.tryLock()).thenReturn(true);
+        C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
+        
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
+        
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(empty());
+
+        testHeartbeatManager.run();
+
+        verify(mockC2HeartbeatFactory, 
times(1)).create(mockRuntimeInfoWrapper);
+        verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
+        verify(mockC2OperationManager, never()).add(any());
+        verify(mockHeartbeatLock, times(1)).unlock();
+
+    }
+
+    @Test
+    void shouldSendHeartbeatAndProcessResponseWithNoOperation() {
+        when(mockHeartbeatLock.tryLock()).thenReturn(true);
+        C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
+        
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
+        C2HeartbeatResponse mockC2HeartbeatResponse = 
mock(C2HeartbeatResponse.class);
+        
when(mockC2HeartbeatResponse.getRequestedOperations()).thenReturn(List.of());
+        
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(Optional.of(mockC2HeartbeatResponse));
+
+        testHeartbeatManager.run();
+
+        verify(mockC2HeartbeatFactory, 
times(1)).create(mockRuntimeInfoWrapper);
+        verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
+        verify(mockC2OperationManager, never()).add(any());
+        verify(mockHeartbeatLock, times(1)).unlock();
+    }
+
+    @Test
+    void shouldSendHeartbeatAndProcessResponseWithMultipleOperation() {
+        when(mockHeartbeatLock.tryLock()).thenReturn(true);
+        C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
+        
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
+        C2HeartbeatResponse mockC2HeartbeatResponse = 
mock(C2HeartbeatResponse.class);
+        C2Operation mockOperation1 = mock(C2Operation.class);
+        C2Operation mockOperation2 = mock(C2Operation.class);
+        
when(mockC2HeartbeatResponse.getRequestedOperations()).thenReturn(List.of(mockOperation1,
 mockOperation2));
+        
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(ofNullable(mockC2HeartbeatResponse));
+
+        testHeartbeatManager.run();
+
+        verify(mockC2HeartbeatFactory, 
times(1)).create(mockRuntimeInfoWrapper);
+        verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
+        verify(mockC2OperationManager, times(1)).add(mockOperation1);
+        verify(mockC2OperationManager, times(1)).add(mockOperation2);
+        verify(mockHeartbeatLock, times(1)).unlock();
+    }
+
+    @Test
+    void shouldReleaseHeartbeatLockWhenExceptionOccurs() {
+        when(mockHeartbeatLock.tryLock()).thenReturn(true);
+        
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenThrow(new 
RuntimeException());
+
+        testHeartbeatManager.run();
+
+        verify(mockC2HeartbeatFactory, 
times(1)).create(mockRuntimeInfoWrapper);
+        verify(mockC2Client, never()).publishHeartbeat(any());
+        verify(mockC2OperationManager, never()).add(any());
+        verify(mockHeartbeatLock, times(1)).unlock();
+    }
+}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java
new file mode 100644
index 0000000000..d531d73ddf
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
+import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2OperationManagerTest {
+
+    private static final long MAX_WAIT_TIME_MS = 1000;
+
+    @Mock
+    private C2Client mockC2Client;
+
+    @Mock
+    private C2OperationHandlerProvider mockC2OperationHandlerProvider;
+
+    @Mock
+    private ReentrantLock mockHeartbeatLock;
+
+    @Mock
+    private OperationQueueDAO mockOperationQueueDAO;
+
+    @Mock
+    private C2OperationRestartHandler mockC2OperationRestartHandler;
+
+    @InjectMocks
+    private C2OperationManager testC2OperationManager;
+
+    @Captor
+    ArgumentCaptor<C2OperationAck> c2OperationAckArgumentCaptor;
+
+    private ExecutorService executorService;
+
+    @BeforeEach
+    void setup() {
+        executorService = newVirtualThreadPerTaskExecutor();
+    }
+
+    @AfterEach
+    void teardown() {
+        executorService.shutdownNow();
+    }
+
+    @Test
+    void shouldWaitForIncomingOperationThenTimeout() {
+        Future<?> future = executorService.submit(testC2OperationManager);
+
+        assertThrows(TimeoutException.class, () -> 
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+        verify(mockC2OperationHandlerProvider, 
never()).getHandlerForOperation(any());
+    }
+
+    @Test
+    void shouldContinueWithoutProcessingWhenNoHandlerIsDefined() {
+        C2Operation testOperation = mock(C2Operation.class);
+        
when(mockC2OperationHandlerProvider.getHandlerForOperation(testOperation)).thenReturn(empty());
+
+        Future<?> future = executorService.submit(testC2OperationManager);
+        testC2OperationManager.add(testOperation);
+
+        assertThrows(TimeoutException.class, () -> 
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+        verify(mockC2Client, never()).acknowledgeOperation(any());
+        verify(mockHeartbeatLock, never()).lock();
+    }
+
+    @Test
+    void shouldProcessOperationWithoutRestartAndAcknowledge() {
+        C2Operation mockOperation = mock(C2Operation.class);
+        C2OperationHandler mockOperationHandler = 
mock(C2OperationHandler.class);
+        C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+        
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
+        
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
+        when(mockOperationHandler.requiresRestart()).thenReturn(false);
+
+        Future<?> future = executorService.submit(testC2OperationManager);
+        testC2OperationManager.add(mockOperation);
+
+        assertThrows(TimeoutException.class, () -> 
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+        verify(mockC2Client, 
times(1)).acknowledgeOperation(mockC2OperationAck);
+        verify(mockHeartbeatLock, never()).lock();
+    }
+
+    @Test
+    void shouldProcessOperationWithSuccessfulRestart() {
+        C2Operation mockOperation = mock(C2Operation.class);
+        C2OperationHandler mockOperationHandler = 
mock(C2OperationHandler.class);
+        C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+        C2OperationState mockC2OperationState = mock(C2OperationState.class);
+        
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
+        
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
+        when(mockOperationHandler.requiresRestart()).thenReturn(true);
+        
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
+        when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
+        
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenReturn(empty());
+
+        Future<?> future = executorService.submit(testC2OperationManager);
+        testC2OperationManager.add(mockOperation);
+
+        assertDoesNotThrow(() -> future.get());
+        verify(mockC2Client, never()).acknowledgeOperation(mockC2OperationAck);
+        verify(mockHeartbeatLock, times(1)).lock();
+        verify(mockHeartbeatLock, never()).unlock();
+        verify(mockOperationQueueDAO, times(1)).save(any());
+        verify(mockOperationQueueDAO, never()).cleanup();
+    }
+
+    @Test
+    void shouldProcessOperationWithFailedRestartDueToFailedResponse() {
+        C2Operation mockOperation = mock(C2Operation.class);
+        C2OperationHandler mockOperationHandler = 
mock(C2OperationHandler.class);
+        C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+        C2OperationState mockC2OperationState = mock(C2OperationState.class);
+        
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
+        
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
+        when(mockOperationHandler.requiresRestart()).thenReturn(true);
+        
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
+        when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
+        
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenReturn(ofNullable(NOT_APPLIED));
+
+        Future<?> future = executorService.submit(testC2OperationManager);
+        testC2OperationManager.add(mockOperation);
+
+        assertThrows(TimeoutException.class, () -> 
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+        verify(mockHeartbeatLock, times(1)).lock();
+        verify(mockHeartbeatLock, times(1)).unlock();
+        verify(mockC2Client, 
times(1)).acknowledgeOperation(mockC2OperationAck);
+        verify(mockOperationQueueDAO, times(1)).save(any());
+        verify(mockOperationQueueDAO, times(1)).cleanup();
+    }
+
+    @Test
+    void shouldProcessOperationWithFailedRestartDueToException() {
+        C2Operation mockOperation = mock(C2Operation.class);
+        C2OperationHandler mockOperationHandler = 
mock(C2OperationHandler.class);
+        C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+        C2OperationState mockC2OperationState = mock(C2OperationState.class);
+        
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
+        
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
+        when(mockOperationHandler.requiresRestart()).thenReturn(true);
+        
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
+        when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
+        
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenThrow(new 
RuntimeException());
+
+        Future<?> future = executorService.submit(testC2OperationManager);
+        testC2OperationManager.add(mockOperation);
+
+        assertThrows(TimeoutException.class, () -> 
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+        verify(mockHeartbeatLock, times(1)).lock();
+        verify(mockHeartbeatLock, times(1)).unlock();
+        verify(mockC2Client, 
times(1)).acknowledgeOperation(mockC2OperationAck);
+        verify(mockOperationQueueDAO, times(1)).save(any());
+        verify(mockOperationQueueDAO, times(1)).cleanup();
+    }
+
+    @Test
+    void shouldProcessStateWithOneCurrentAndNoRemainingOperations() {
+        OperationQueue mockOperationQueue = mock(OperationQueue.class);
+        C2Operation mockCurrentOperation = mock(C2Operation.class);
+        
when(mockOperationQueue.getCurrentOperation()).thenReturn(mockCurrentOperation);
+        
when(mockOperationQueue.getRemainingOperations()).thenReturn(List.of());
+        
when(mockOperationQueueDAO.load()).thenReturn(ofNullable(mockOperationQueue));
+        
when(mockC2OperationRestartHandler.waitForResponse()).thenReturn(ofNullable(FULLY_APPLIED));
+
+        Future<?> future = executorService.submit(testC2OperationManager);
+
+        assertThrows(TimeoutException.class, () -> 
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+        verify(mockHeartbeatLock, never()).lock();
+        verify(mockHeartbeatLock, never()).unlock();
+        verify(mockC2Client, 
times(1)).acknowledgeOperation(c2OperationAckArgumentCaptor.capture());
+        assertEquals(FULLY_APPLIED, 
c2OperationAckArgumentCaptor.getValue().getOperationState().getState());
+    }
+
+    @Test
+    void shouldProcessStateWithOneCurrentAndOneRemainingOperation() {
+        OperationQueue mockOperationQueue = mock(OperationQueue.class);
+        C2Operation mockCurrentOperation = mock(C2Operation.class);
+        C2Operation mockRemainingOperation = mock(C2Operation.class);
+        
when(mockOperationQueue.getCurrentOperation()).thenReturn(mockCurrentOperation);
+        
when(mockOperationQueue.getRemainingOperations()).thenReturn(List.of(mockRemainingOperation));
+        
when(mockOperationQueueDAO.load()).thenReturn(ofNullable(mockOperationQueue));
+        
when(mockC2OperationRestartHandler.waitForResponse()).thenReturn(ofNullable(FULLY_APPLIED));
+        C2OperationHandler mockOperationHandler = 
mock(C2OperationHandler.class);
+        C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+        
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockRemainingOperation)).thenReturn(ofNullable(mockOperationHandler));
+        
when(mockOperationHandler.handle(mockRemainingOperation)).thenReturn(mockC2OperationAck);
+        when(mockOperationHandler.requiresRestart()).thenReturn(false);
+
+        Future<?> future = executorService.submit(testC2OperationManager);
+
+        assertThrows(TimeoutException.class, () -> 
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+        verify(mockHeartbeatLock, times(1)).lock();
+        verify(mockHeartbeatLock, times(1)).unlock();
+        verify(mockC2Client, times(2)).acknowledgeOperation(any());
+    }
+}
diff --git 
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
 
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
index f685519f04..07e2c16a50 100644
--- 
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
+++ 
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
@@ -91,6 +91,7 @@ public enum MiNiFiProperties {
     C2_SECURITY_KEYSTORE_PASSWORD("c2.security.keystore.password", "", true, 
false, VALID),
     C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, 
false, VALID),
     C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, 
VALID),
+    C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 
sec", false, true, VALID),
     NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, 
false, true, VALID),
     
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path",
 null, false, true, VALID),
     
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds",
 null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java
new file mode 100644
index 0000000000..76632381dc
--- /dev/null
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
+import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.nifi.bootstrap.BootstrapCommunicator;
+import org.apache.nifi.bootstrap.CommandResult;
+import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BootstrapC2OperationRestartHandler implements 
C2OperationRestartHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BootstrapC2OperationRestartHandler.class);
+
+    private static final String ACKNOWLEDGE_OPERATION = 
"ACKNOWLEDGE_OPERATION";
+    private static final String TIMEOUT = "timeout";
+    private static final Map<MiNiFiCommandState, OperationState> 
OPERATION_STATE_MAP = Map.of(
+        MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED,
+        MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION,
+        MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, NOT_APPLIED,
+        MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, NOT_APPLIED);
+
+    private final BootstrapCommunicator bootstrapCommunicator;
+    private final BlockingQueue<OperationState> operationStateHolder;
+    private final long bootstrapAcknowledgeTimeoutMs;
+
+    public BootstrapC2OperationRestartHandler(BootstrapCommunicator 
bootstrapCommunicator, long bootstrapAcknowledgeTimeoutMs) {
+        this.bootstrapCommunicator = bootstrapCommunicator;
+        this.operationStateHolder = new ArrayBlockingQueue<>(1);
+        this.bootstrapAcknowledgeTimeoutMs = bootstrapAcknowledgeTimeoutMs;
+        bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION, 
this::bootstrapCallback);
+    }
+
+    @Override
+    public Optional<OperationState> handleRestart(C2Operation c2Operation) {
+        CommandResult sendCommandResult = sendBootstrapCommand(c2Operation);
+        if (sendCommandResult == SUCCESS) {
+            LOGGER.debug("Bootstrap successfully received command. Waiting for 
response");
+            return waitForResponse();
+        } else {
+            LOGGER.debug("Bootstrap failed to receive command");
+            return Optional.of(NOT_APPLIED);
+        }
+    }
+
+    @Override
+    public Optional<OperationState> waitForResponse() {
+        try {
+            OperationState operationState = 
operationStateHolder.poll(bootstrapAcknowledgeTimeoutMs, MILLISECONDS);
+            LOGGER.debug("Bootstrap returned response: {}", 
ofNullable(operationState).map(Objects::toString).orElse(TIMEOUT));
+            return Optional.of(ofNullable(operationState).orElse(NOT_APPLIED));
+        } catch (InterruptedException e) {
+            LOGGER.debug("Bootstrap response waiting interrupted, possible due 
to Bootstrap is restarting MiNiFi process");
+            return empty();
+        }
+    }
+
+    private CommandResult sendBootstrapCommand(C2Operation c2Operation) {
+        String command = createBootstrapCommand(c2Operation);
+        try {
+            return bootstrapCommunicator.sendCommand(command);
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            return FAILURE;
+        }
+    }
+
+    private String createBootstrapCommand(C2Operation c2Operation) {
+        return ofNullable(c2Operation.getOperand())
+            .map(operand -> c2Operation.getOperation().name() + "_" + 
operand.name())
+            .orElse(c2Operation.getOperation().name());
+    }
+
+    private void bootstrapCallback(String[] params, OutputStream outputStream) 
{
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap");
+            return;
+        }
+        MiNiFiCommandState miNiFiCommandState = 
MiNiFiCommandState.valueOf(params[0]);
+        OperationState operationState = 
OPERATION_STATE_MAP.get(miNiFiCommandState);
+        try {
+            operationStateHolder.put(operationState);
+        } catch (InterruptedException e) {
+            LOGGER.warn("Bootstrap hook thread was interrupted");
+        }
+    }
+}
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index 0915d87ce6..e0ddec368a 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -17,12 +17,19 @@
 
 package org.apache.nifi.minifi.c2;
 
+import static java.lang.Boolean.parseBoolean;
+import static java.lang.Integer.parseInt;
+import static java.lang.Long.parseLong;
 import static java.util.Optional.ofNullable;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_CLASS;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_HEARTBEAT_PERIOD;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_IDENTIFIER;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_ASSET_DIRECTORY;
+import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_CONFIG_DIRECTORY;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FULL_HEARTBEAT;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_KEEP_ALIVE_DURATION;
@@ -45,24 +52,24 @@ import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_KE
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_LOCATION;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_PASSWORD;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_TYPE;
+import static org.apache.nifi.util.FormatUtils.getPreciseTimeDuration;
 import static org.apache.nifi.util.NiFiProperties.FLOW_CONFIGURATION_FILE;
 import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_ALGORITHM;
 import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.bootstrap.BootstrapCommunicator;
 import org.apache.nifi.c2.client.C2ClientConfig;
 import org.apache.nifi.c2.client.http.C2HttpClient;
-import org.apache.nifi.c2.client.service.C2ClientService;
 import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
+import org.apache.nifi.c2.client.service.C2HeartbeatManager;
+import org.apache.nifi.c2.client.service.C2OperationManager;
 import org.apache.nifi.c2.client.service.FlowIdHolder;
 import org.apache.nifi.c2.client.service.ManifestHashProvider;
 import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
@@ -70,8 +77,7 @@ import 
org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
 import 
org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
 import 
org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
 import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
-import org.apache.nifi.c2.client.service.operation.OperationQueue;
-import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
+import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
 import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
 import 
org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
 import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler;
@@ -81,10 +87,6 @@ import 
org.apache.nifi.c2.client.service.operation.UpdatePropertiesOperationHand
 import org.apache.nifi.c2.protocol.api.AgentManifest;
 import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
-import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
-import org.apache.nifi.c2.protocol.api.C2OperationState;
-import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
 import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
 import org.apache.nifi.c2.serializer.C2JacksonSerializer;
 import org.apache.nifi.controller.FlowController;
@@ -99,14 +101,13 @@ import 
org.apache.nifi.minifi.c2.command.PropertiesPersister;
 import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
 import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
 import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider;
-import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
 import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
 import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
 import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor;
 import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
 import org.apache.nifi.nar.ExtensionManagerHolder;
 import org.apache.nifi.services.FlowService;
-import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,35 +116,25 @@ public class C2NifiClientService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(C2NifiClientService.class);
     private static final String ROOT_GROUP_ID = "root";
-    private static final Long INITIAL_DELAY = 10000L;
     private static final Integer TERMINATION_WAIT = 5000;
-    private static final int MINIFI_RESTART_TIMEOUT_SECONDS = 60;
-    private static final String ACKNOWLEDGE_OPERATION = 
"ACKNOWLEDGE_OPERATION";
-    private static final int IS_ACK_RECEIVED_POLL_INTERVAL = 1000;
-    private static final int MAX_WAIT_FOR_BOOTSTRAP_ACK_MS = 20000;
+    private static final Long INITIAL_HEARTBEAT_DELAY_MS = 10000L;
 
-    private static final Map<MiNiFiCommandState, OperationState> 
OPERATION_STATE_MAP = Map.of(
-        MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED,
-        MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION,
-        MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, 
OperationState.NOT_APPLIED,
-        MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, 
OperationState.NOT_APPLIED);
+    private final ScheduledExecutorService heartbeatManagerExecutorService;
+    private final ExecutorService operationManagerExecutorService;
 
-    private final C2ClientService c2ClientService;
     private final FlowController flowController;
-    private final ScheduledThreadPoolExecutor heartbeatExecutorService;
-    private final ScheduledThreadPoolExecutor 
bootstrapAcknowledgeExecutorService;
     private final ExtensionManifestParser extensionManifestParser;
     private final RuntimeManifestService runtimeManifestService;
     private final SupportedOperationsProvider supportedOperationsProvider;
-    private final RequestedOperationDAO requestedOperationDAO;
-    private final BootstrapCommunicator bootstrapCommunicator;
-    private final long heartbeatPeriod;
+    private final C2HeartbeatManager c2HeartbeatManager;
+    private final C2OperationManager c2OperationManager;
 
-    private volatile boolean ackReceived = false;
+    private final long heartbeatPeriod;
 
     public C2NifiClientService(NiFiProperties niFiProperties, FlowController 
flowController, BootstrapCommunicator bootstrapCommunicator, FlowService 
flowService) {
-        this.heartbeatExecutorService = new ScheduledThreadPoolExecutor(1);
-        this.bootstrapAcknowledgeExecutorService = new 
ScheduledThreadPoolExecutor(1);
+        this.heartbeatManagerExecutorService = newScheduledThreadPool(1);
+        this.operationManagerExecutorService = newSingleThreadExecutor();
+
         this.extensionManifestParser = new JAXBExtensionManifestParser();
 
         C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
@@ -160,36 +151,34 @@ public class C2NifiClientService {
         C2HttpClient client = C2HttpClient.create(clientConfig, new 
C2JacksonSerializer());
         FlowIdHolder flowIdHolder = new 
FlowIdHolder(clientConfig.getConfDirectory());
         C2HeartbeatFactory heartbeatFactory = new 
C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider());
-
-        this.requestedOperationDAO = new 
FileBasedRequestedOperationDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir",
 "bin"), new ObjectMapper());
         String bootstrapConfigFileLocation = 
niFiProperties.getProperty("nifi.minifi.bootstrap.file");
-
         C2OperationHandlerProvider c2OperationHandlerProvider = 
c2OperationHandlerProvider(niFiProperties, flowController, flowService, 
flowIdHolder,
             client, heartbeatFactory, bootstrapConfigFileLocation, 
clientConfig.getC2AssetDirectory());
 
-        this.c2ClientService = new C2ClientService(client, heartbeatFactory, 
c2OperationHandlerProvider, requestedOperationDAO, this::registerOperation);
         this.supportedOperationsProvider = new 
SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers());
 
-        this.bootstrapCommunicator = bootstrapCommunicator;
-        
this.bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION, 
(params, output) -> acknowledgeHandler(params));
+        OperationQueueDAO operationQueueDAO =
+            new 
FileBasedOperationQueueDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir",
 "bin"), new ObjectMapper());
+        ReentrantLock heartbeatLock = new ReentrantLock();
+        BootstrapC2OperationRestartHandler c2OperationRestartHandler = new 
BootstrapC2OperationRestartHandler(bootstrapCommunicator, 
clientConfig.getBootstrapAcknowledgeTimeout());
+
+        this.c2OperationManager = new C2OperationManager(
+            client, c2OperationHandlerProvider, heartbeatLock, 
operationQueueDAO, c2OperationRestartHandler);
+        this.c2HeartbeatManager = new C2HeartbeatManager(
+            client, heartbeatFactory, heartbeatLock, generateRuntimeInfo(), 
c2OperationManager);
     }
 
     private C2ClientConfig generateClientConfig(NiFiProperties properties) {
         return new C2ClientConfig.Builder()
             .agentClass(properties.getProperty(C2_AGENT_CLASS.getKey(), 
C2_AGENT_CLASS.getDefaultValue()))
             
.agentIdentifier(properties.getProperty(C2_AGENT_IDENTIFIER.getKey()))
-            
.fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(),
 C2_FULL_HEARTBEAT.getDefaultValue())))
-            
.heartbeatPeriod(Long.parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(),
-                C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue())))
-            .connectTimeout((long) 
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CONNECTION_TIMEOUT.getKey(),
-                C2_REST_CONNECTION_TIMEOUT.getDefaultValue()), 
TimeUnit.MILLISECONDS))
-            .readTimeout((long) 
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_READ_TIMEOUT.getKey(),
-                C2_REST_READ_TIMEOUT.getDefaultValue()), 
TimeUnit.MILLISECONDS))
-            .callTimeout((long) 
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CALL_TIMEOUT.getKey(),
-                C2_REST_CALL_TIMEOUT.getDefaultValue()), 
TimeUnit.MILLISECONDS))
-            
.maxIdleConnections(Integer.parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(),
 C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
-            .keepAliveDuration((long) 
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_KEEP_ALIVE_DURATION.getKey(),
-                C2_KEEP_ALIVE_DURATION.getDefaultValue()), 
TimeUnit.MILLISECONDS))
+            
.fullHeartbeat(parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(), 
C2_FULL_HEARTBEAT.getDefaultValue())))
+            
.heartbeatPeriod(parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(),
 C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue())))
+            .connectTimeout(durationPropertyInMilliSecs(properties, 
C2_REST_CONNECTION_TIMEOUT))
+            .readTimeout(durationPropertyInMilliSecs(properties, 
C2_REST_READ_TIMEOUT))
+            .callTimeout(durationPropertyInMilliSecs(properties, 
C2_REST_CALL_TIMEOUT))
+            
.maxIdleConnections(parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(),
 C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
+            .keepAliveDuration(durationPropertyInMilliSecs(properties, 
C2_KEEP_ALIVE_DURATION))
             .httpHeaders(properties.getProperty(C2_REST_HTTP_HEADERS.getKey(), 
C2_REST_HTTP_HEADERS.getDefaultValue()))
             
.c2RequestCompression(properties.getProperty(C2_REQUEST_COMPRESSION.getKey(), 
C2_REQUEST_COMPRESSION.getDefaultValue()))
             
.c2AssetDirectory(properties.getProperty(C2_ASSET_DIRECTORY.getKey(), 
C2_ASSET_DIRECTORY.getDefaultValue()))
@@ -207,9 +196,14 @@ public class C2NifiClientService {
             .c2RestPathBase(properties.getProperty(C2_REST_PATH_BASE.getKey(), 
C2_REST_PATH_BASE.getDefaultValue()))
             
.c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(), 
C2_REST_PATH_HEARTBEAT.getDefaultValue()))
             
.c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(),
 C2_REST_PATH_ACKNOWLEDGE.getDefaultValue()))
+            
.bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties, 
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT))
             .build();
     }
 
+    private long durationPropertyInMilliSecs(NiFiProperties properties, 
MiNiFiProperties property) {
+        return (long) 
getPreciseTimeDuration(properties.getProperty(property.getKey(), 
property.getDefaultValue()), MILLISECONDS);
+    }
+
     private C2OperationHandlerProvider 
c2OperationHandlerProvider(NiFiProperties niFiProperties, FlowController 
flowController, FlowService flowService,
                                                                   FlowIdHolder 
flowIdHolder, C2HttpClient client, C2HeartbeatFactory heartbeatFactory,
                                                                   String 
bootstrapConfigFileLocation, String c2AssetDirectory) {
@@ -240,107 +234,29 @@ public class C2NifiClientService {
     }
 
     public void start() {
-        handleOngoingOperations();
-        heartbeatExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
-    }
-
-    // need to be synchronized to prevent parallel run coming from 
acknowledgeHandler/ackTimeoutTask
-    private synchronized void handleOngoingOperations() {
-        Optional<OperationQueue> operationQueue = requestedOperationDAO.load();
-        LOGGER.info("Handling ongoing operations: {}", operationQueue);
-        if (operationQueue.isPresent()) {
-            try {
-                waitForAcknowledgeFromBootstrap();
-                
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
-            } catch (Exception e) {
-                LOGGER.error("Failed to process c2 operations queue", e);
-                c2ClientService.enableHeartbeat();
-            }
-        } else {
-            c2ClientService.enableHeartbeat();
-        }
-    }
-
-    private void waitForAcknowledgeFromBootstrap() {
-        LOGGER.info("Waiting for ACK signal from Bootstrap");
-        int currentWaitTime = 0;
-        while (!ackReceived) {
-            try {
-                Thread.sleep(IS_ACK_RECEIVED_POLL_INTERVAL);
-            } catch (InterruptedException e) {
-                LOGGER.warn("Thread interrupted while waiting for 
Acknowledge");
-            }
-            currentWaitTime += IS_ACK_RECEIVED_POLL_INTERVAL;
-            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
-                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from 
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
-                break;
-            }
-        }
+        operationManagerExecutorService.execute(c2OperationManager);
+        LOGGER.debug("Scheduling heartbeats with {} ms periodicity", 
heartbeatPeriod);
+        
heartbeatManagerExecutorService.scheduleAtFixedRate(c2HeartbeatManager, 
INITIAL_HEARTBEAT_DELAY_MS, heartbeatPeriod, MILLISECONDS);
     }
 
-    private void registerOperation(C2Operation c2Operation) {
+    public void stop() {
+        heartbeatManagerExecutorService.shutdown();
         try {
-            ackReceived = false;
-            registerAcknowledgeTimeoutTask(c2Operation);
-            String command = ofNullable(c2Operation.getOperand())
-                .map(operand -> c2Operation.getOperation().name() + "_" + 
operand.name())
-                .orElse(c2Operation.getOperation().name());
-            bootstrapCommunicator.sendCommand(command);
-        } catch (IOException e) {
-            LOGGER.error("Failed to send operation to bootstrap", e);
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    private void registerAcknowledgeTimeoutTask(C2Operation c2Operation) {
-        bootstrapAcknowledgeExecutorService.schedule(() -> {
-            if (!ackReceived) {
-                LOGGER.info("Operation requiring restart is failed, and no 
restart/acknowledge is happened after {} seconds for {}. Handling remaining 
operations.",
-                    MINIFI_RESTART_TIMEOUT_SECONDS, c2Operation);
-                handleOngoingOperations();
+            if 
(!heartbeatManagerExecutorService.awaitTermination(TERMINATION_WAIT, 
MILLISECONDS)) {
+                heartbeatManagerExecutorService.shutdownNow();
             }
-        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-    }
-
-    private void acknowledgeHandler(String[] params) {
-        LOGGER.info("Received acknowledge message from bootstrap process");
-        if (params.length < 1) {
-            LOGGER.error("Invalid arguments coming from bootstrap, skipping 
acknowledging latest operation");
-            return;
-        }
-
-        Optional<OperationQueue> optionalOperationQueue = 
requestedOperationDAO.load();
-        ackReceived = true;
-        if (optionalOperationQueue.isPresent()) {
-            OperationQueue operationQueue = optionalOperationQueue.get();
-            C2Operation c2Operation = operationQueue.getCurrentOperation();
-            C2OperationAck c2OperationAck = new C2OperationAck();
-            c2OperationAck.setOperationId(c2Operation.getIdentifier());
-            C2OperationState c2OperationState = new C2OperationState();
-            MiNiFiCommandState miNiFiCommandState = 
MiNiFiCommandState.valueOf(params[0]);
-            OperationState state = OPERATION_STATE_MAP.get(miNiFiCommandState);
-            c2OperationState.setState(state);
-            c2OperationAck.setOperationState(c2OperationState);
-            c2ClientService.sendAcknowledge(c2OperationAck);
-            if (MiNiFiCommandState.NO_OPERATION == miNiFiCommandState || 
MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART == miNiFiCommandState) {
-                LOGGER.debug("No restart happened because of an error / the 
app was already in the desired state");
-                handleOngoingOperations();
-            }
-        } else {
-            LOGGER.error("Can not send acknowledge due to empty Operation 
Queue");
+        } catch (InterruptedException ignore) {
+            LOGGER.info("Stopping C2 heartbeat executor service was 
interrupted, forcing shutdown");
+            heartbeatManagerExecutorService.shutdownNow();
         }
-    }
-
-    public void stop() {
-        bootstrapAcknowledgeExecutorService.shutdownNow();
-        heartbeatExecutorService.shutdown();
+        operationManagerExecutorService.shutdown();
         try {
-            if (!heartbeatExecutorService.awaitTermination(TERMINATION_WAIT, 
TimeUnit.MILLISECONDS)) {
-                heartbeatExecutorService.shutdownNow();
+            if 
(!operationManagerExecutorService.awaitTermination(TERMINATION_WAIT, 
MILLISECONDS)) {
+                operationManagerExecutorService.shutdownNow();
             }
         } catch (InterruptedException ignore) {
-            LOGGER.info("Stopping C2 Client's thread was interrupted but 
shutting down anyway the C2NifiClientService");
-            heartbeatExecutorService.shutdownNow();
+            LOGGER.info("Stopping C2 operation executor service was 
interrupted, forcing shutdown");
+            operationManagerExecutorService.shutdownNow();
         }
     }
 
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java
similarity index 87%
rename from 
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
rename to 
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java
index 80b1418c95..72f4c55bbd 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java
@@ -17,22 +17,25 @@
 
 package org.apache.nifi.minifi.c2;
 
+import static org.slf4j.LoggerFactory.getLogger;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
 import java.util.Optional;
 import org.apache.nifi.c2.client.service.operation.OperationQueue;
-import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
+import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class FileBasedRequestedOperationDAO implements RequestedOperationDAO {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileBasedRequestedOperationDAO.class);
+public class FileBasedOperationQueueDAO implements OperationQueueDAO {
+
+    private static final Logger LOGGER = 
getLogger(FileBasedOperationQueueDAO.class);
+
     protected static final String REQUESTED_OPERATIONS_FILE_NAME = 
"requestedOperations.data";
 
     private final ObjectMapper objectMapper;
     private final File requestedOperationsFile;
 
-    public FileBasedRequestedOperationDAO(String runDir, ObjectMapper 
objectMapper) {
+    public FileBasedOperationQueueDAO(String runDir, ObjectMapper 
objectMapper) {
         this.requestedOperationsFile = new File(runDir, 
REQUESTED_OPERATIONS_FILE_NAME);
         this.objectMapper = objectMapper;
     }
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java
new file mode 100644
index 0000000000..c78155fa9a
--- /dev/null
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static java.lang.Thread.sleep;
+import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
+import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
+import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperationType.START;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import org.apache.nifi.bootstrap.BootstrapCommunicator;
+import org.apache.nifi.bootstrap.CommandResult;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class BootstrapC2OperationRestartHandlerTest {
+
+    @Test
+    void shouldReturnNotAppliedWhenBootstrapCommunicatorReturnsFalse() throws 
IOException {
+        C2Operation inputOperation = new C2Operation();
+        inputOperation.setOperation(START);
+        BootstrapCommunicator bootstrapCommunicator = 
mock(BootstrapCommunicator.class);
+        
when(bootstrapCommunicator.sendCommand(START.name())).thenReturn(FAILURE);
+        long bootstrapAcknowledgeTimeoutMs = 0;
+
+        BootstrapC2OperationRestartHandler testHandler = new 
BootstrapC2OperationRestartHandler(bootstrapCommunicator, 
bootstrapAcknowledgeTimeoutMs);
+        Optional<OperationState> result = 
testHandler.handleRestart(inputOperation);
+
+        assertTrue(result.isPresent());
+        assertEquals(NOT_APPLIED, result.get());
+    }
+
+    @Test
+    void shouldReturnNotAppliedWhenBootstrapCommunicatorThrowsException() 
throws IOException {
+        C2Operation inputOperation = new C2Operation();
+        inputOperation.setOperation(START);
+        BootstrapCommunicator bootstrapCommunicator = 
mock(BootstrapCommunicator.class);
+        when(bootstrapCommunicator.sendCommand(START.name())).thenThrow(new 
IOException());
+        long bootstrapAcknowledgeTimeoutMs = 0;
+
+        BootstrapC2OperationRestartHandler testHandler = new 
BootstrapC2OperationRestartHandler(bootstrapCommunicator, 
bootstrapAcknowledgeTimeoutMs);
+        Optional<OperationState> result = 
testHandler.handleRestart(inputOperation);
+
+        assertTrue(result.isPresent());
+        assertEquals(NOT_APPLIED, result.get());
+    }
+
+    @Test
+    void shouldReturnStateAcknowledgedByBootstrapCommunicator() {
+        C2Operation inputOperation = new C2Operation();
+        inputOperation.setOperation(START);
+        long bootstrapAcknowledgeTimeoutMs = 1000;
+        long waitBeforeAcknowledgeMs = 100;
+        String[] callbackResult = new String[] {FULLY_APPLIED.name()};
+        BootstrapCommunicatorStub bootstrapCommunicator = new 
BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
+
+        BootstrapC2OperationRestartHandler testHandler = new 
BootstrapC2OperationRestartHandler(bootstrapCommunicator, 
bootstrapAcknowledgeTimeoutMs);
+        try (ExecutorService executorService = 
newVirtualThreadPerTaskExecutor()) {
+            executorService.execute(bootstrapCommunicator);
+            Optional<OperationState> result = 
testHandler.handleRestart(inputOperation);
+
+            assertTrue(result.isPresent());
+            assertEquals(FULLY_APPLIED, result.get());
+        }
+    }
+
+    @Test
+    void shouldReturnNotAppliedWhenBootstrapAcknowledgeTimesOut() {
+        C2Operation inputOperation = new C2Operation();
+        inputOperation.setOperation(START);
+        String[] callbackResult = new String[] {FULLY_APPLIED.name()};
+        long bootstrapAcknowledgeTimeoutMs = 1000;
+        long waitBeforeAcknowledgeMs = 2000;
+        BootstrapCommunicatorStub bootstrapCommunicator = new 
BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
+
+        BootstrapC2OperationRestartHandler testHandler = new 
BootstrapC2OperationRestartHandler(bootstrapCommunicator, 
bootstrapAcknowledgeTimeoutMs);
+        try (ExecutorService executorService = 
newVirtualThreadPerTaskExecutor()) {
+            executorService.execute(bootstrapCommunicator);
+            Optional<OperationState> result = 
testHandler.handleRestart(inputOperation);
+
+            assertTrue(result.isPresent());
+            assertEquals(NOT_APPLIED, result.get());
+        }
+    }
+
+    @Test
+    void shouldReturnNotAppliedWhenBootstrapSendInvalidResponse() {
+        C2Operation inputOperation = new C2Operation();
+        inputOperation.setOperation(START);
+        String[] callbackResult = new String[] {};
+        long bootstrapAcknowledgeTimeoutMs = 1000;
+        long waitBeforeAcknowledgeMs = 100;
+        BootstrapCommunicatorStub bootstrapCommunicator = new 
BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
+
+        BootstrapC2OperationRestartHandler testHandler = new 
BootstrapC2OperationRestartHandler(bootstrapCommunicator, 
bootstrapAcknowledgeTimeoutMs);
+        try (ExecutorService executorService = 
newVirtualThreadPerTaskExecutor()) {
+            executorService.execute(bootstrapCommunicator);
+            Optional<OperationState> result = 
testHandler.handleRestart(inputOperation);
+
+            assertTrue(result.isPresent());
+            assertEquals(NOT_APPLIED, result.get());
+        }
+    }
+
+    static class BootstrapCommunicatorStub implements BootstrapCommunicator, 
Runnable {
+
+        private final CommandResult sendCommandResult;
+        private final String[] callbackResult;
+        private final long waitBeforeAcknowledgeMs;
+        private BiConsumer<String[], OutputStream> handler;
+
+        BootstrapCommunicatorStub(CommandResult sendCommandResult, String[] 
callbackResult, long waitBeforeAcknowledgeMs) {
+            this.sendCommandResult = sendCommandResult;
+            this.callbackResult = callbackResult;
+            this.waitBeforeAcknowledgeMs = waitBeforeAcknowledgeMs;
+        }
+
+        @Override
+        public void run() {
+            try {
+                sleep(waitBeforeAcknowledgeMs);
+            } catch (InterruptedException ignore) {
+            }
+            handler.accept(callbackResult, null);
+        }
+
+        @Override
+        public CommandResult sendCommand(String command, String... args) {
+            return sendCommandResult;
+        }
+
+        @Override
+        public void registerMessageHandler(String command, 
BiConsumer<String[], OutputStream> handler) {
+            this.handler = handler;
+        }
+    }
+
+}
+
+
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java
similarity index 90%
rename from 
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
rename to 
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java
index 962e98b2e9..b2ae03cd03 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.nifi.minifi.c2;
 
-import static 
org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static 
org.apache.nifi.minifi.c2.FileBasedOperationQueueDAO.REQUESTED_OPERATIONS_FILE_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import org.apache.nifi.c2.client.service.operation.OperationQueue;
 import org.apache.nifi.c2.protocol.api.C2Operation;
@@ -45,7 +46,7 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
-class FileBasedRequestedOperationDAOTest {
+class FileBasedOperationQueueDAOTest {
 
     @Mock
     private ObjectMapper objectMapper;
@@ -53,11 +54,11 @@ class FileBasedRequestedOperationDAOTest {
     @TempDir
     File tmpDir;
 
-    private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+    private FileBasedOperationQueueDAO fileBasedRequestedOperationDAO;
 
     @BeforeEach
     void setup() {
-        fileBasedRequestedOperationDAO = new 
FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+        fileBasedRequestedOperationDAO = new 
FileBasedOperationQueueDAO(tmpDir.getAbsolutePath(), objectMapper);
     }
 
     @Test
@@ -109,6 +110,6 @@ class FileBasedRequestedOperationDAOTest {
         C2Operation currentOperation = new C2Operation();
         currentOperation.setIdentifier("id2");
 
-        return new OperationQueue(currentOperation, 
Collections.singletonList(c2Operation));
+        return new OperationQueue(currentOperation, List.of(c2Operation));
     }
 }
\ No newline at end of file
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
index 911ddc44a7..fa6b5db211 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
@@ -14,8 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.minifi.bootstrap;
 
+import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
+import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
+
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.BufferedReader;
@@ -38,6 +42,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.BiConsumer;
 import org.apache.nifi.bootstrap.BootstrapCommunicator;
+import org.apache.nifi.bootstrap.CommandResult;
 import org.apache.nifi.minifi.MiNiFiServer;
 import org.apache.nifi.minifi.commons.status.FlowStatusReport;
 import org.apache.nifi.minifi.status.StatusRequestException;
@@ -90,29 +95,29 @@ public class BootstrapListener implements 
BootstrapCommunicator {
         listenThread.start();
 
         logger.debug("Notifying Bootstrap that local port is {}", localPort);
-        sendCommand("PORT", new String[]{String.valueOf(localPort), 
secretKey});
+        sendCommand("PORT", new String[] {String.valueOf(localPort), 
secretKey});
     }
 
     public void reload() throws IOException {
         if (listener != null) {
             listener.stop();
         }
-        sendCommand(RELOAD, new String[]{});
+        sendCommand(RELOAD, new String[] {});
     }
 
     public void stop() throws IOException {
         if (listener != null) {
             listener.stop();
         }
-        sendCommand(SHUTDOWN, new String[]{});
+        sendCommand(SHUTDOWN, new String[] {});
     }
 
     public void sendStartedStatus(boolean status) throws IOException {
         logger.debug("Notifying Bootstrap that the status of starting MiNiFi 
is {}", status);
-        sendCommand(STARTED, new String[]{String.valueOf(status)});
+        sendCommand(STARTED, new String[] {String.valueOf(status)});
     }
 
-    public void sendCommand(String command, String[] args) throws IOException {
+    public CommandResult sendCommand(String command, String[] args) throws 
IOException {
         try (Socket socket = new Socket()) {
             socket.setSoTimeout(60000);
             socket.connect(new InetSocketAddress("localhost", bootstrapPort));
@@ -134,8 +139,10 @@ public class BootstrapListener implements 
BootstrapCommunicator {
             String response = reader.readLine();
             if ("OK".equals(response)) {
                 logger.info("Successfully initiated communication with 
Bootstrap");
+                return SUCCESS;
             } else {
                 logger.error("Failed to communicate with Bootstrap. Bootstrap 
may be unable to issue or receive commands from MiNiFi");
+                return FAILURE;
             }
         }
     }
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
index 41784233c7..1bfc1563d9 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
@@ -25,11 +25,13 @@ public interface BootstrapCommunicator {
 
     /**
      * Sends a command with specific arguments to the bootstrap process
+     *
      * @param command the command to send
-     * @param args the args to send
+     * @param args    the args to send
+     * @return {@link CommandResult} of the command sent to Bootstrap
      * @throws IOException exception in case of communication issue
      */
-    void sendCommand(String command, String... args) throws IOException;
+    CommandResult sendCommand(String command, String... args) throws 
IOException;
 
     /**
      * Register a handler for messages coming from bootstrap process
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
 b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java
similarity index 55%
rename from 
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
rename to 
nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java
index 1216aa812d..ebb9bde6d3 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java
@@ -15,31 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.c2.client.service.operation;
-
-import java.util.Optional;
-
-/**
- * The purpose of this interface is to be able to persist operations between 
restarts.
- */
-public interface RequestedOperationDAO {
-
-    /**
-     * Persist the given requested operation list
-     * @param operationQueue the queue containing the current and remaining 
operations
-     */
-    void save(OperationQueue operationQueue);
-
-    /**
-     * Returns the saved Operations
-     *
-     * @return the C2 Operations queue with the actual operation
-     */
-    Optional<OperationQueue> load();
-
-    /**
-     * Resets the saved operations
-     */
-    void cleanup();
+package org.apache.nifi.bootstrap;
 
+public enum CommandResult {
+    FAILURE,
+    SUCCESS
 }

Reply via email to