ferencerdei commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1062251850


##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdatePropertiesOperationHandlerTest {
+
+    protected static final String ID = "id";

Review Comment:
   no reason, changed them.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response) 
{
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> 
requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, 
C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart()
+            && !Optional.ofNullable(c2OperationAck)

Review Comment:
   isEmpty is added only in java11, I'm not sure if we are ready to use it, but 
I moved it to a separate method.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java:
##########
@@ -36,6 +36,8 @@ public class C2NiFiProperties {
     public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + 
"rest.connectionTimeout";
     public static final String C2_READ_TIMEOUT = C2_PREFIX + 
"rest.readTimeout";
     public static final String C2_CALL_TIMEOUT = C2_PREFIX + 
"rest.callTimeout";
+    public static final String C2_MAX_IDLE_CONNECTIONS = C2_PREFIX + 
"rest.maxIdleConnections";

Review Comment:
   Merged them.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig 
generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> 
operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for 
Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from 
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + 
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, 
objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {

Review Comment:
   This method was also added because of unexpected communication issues 
between bootstrap and the minifi process. I'm not sure if it's needed to apply 
more defensive programming here (but I'm open if you have any idea).



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig 
generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> 
operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for 
Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from 
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + 
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, 
objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap 
after {} seconds. Handling remaining operations.", 
MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping 
acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = 
requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            OperationState state = OperationState.valueOf(params[0]);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (state != OperationState.FULLY_APPLIED) {
+                handleOngoingOperations(optionalOperationQueue);

Review Comment:
   I can add more state to MiNiFiCommandState so we can handle these 
differences.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java:
##########
@@ -48,6 +48,14 @@ public interface C2OperationHandler {
      */
     Map<String, Object> getProperties();
 
+    /**
+     * Determines if the given operation requires to restart the MiNiFi process
+     * @return true if it requires restart, false otherwise
+     */
+    default boolean requiresRestart() {

Review Comment:
   By client side what do you mean? It really depends on what the given handler 
modifies. Eg.: if a configuration file is modified it can't be applied without 
restarting. It also requires some internal Nifi knowledge about how 
bootstrapping works. For me, this was the most straightforward way to handle 
this, so if we add new operations we don't need to modify other parts of the 
code.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import 
org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import 
org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(UpdateConfigurationService.class);
+    private static final String UPDATED_CONFIG_FILE_NAME = 
"config-updated.yml";
+
+    private final Differentiator<ByteBuffer> differentiator;
+    private final RunMiNiFi runMiNiFi;
+    private final ConfigurationChangeListener 
miNiFiConfigurationChangeListener;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    public UpdateConfigurationService(RunMiNiFi runMiNiFi, 
ConfigurationChangeListener miNiFiConfigurationChangeListener, 
BootstrapFileProvider bootstrapFileProvider) {
+        this.differentiator = 
WholeConfigDifferentiator.getByteBufferDifferentiator();
+        this.differentiator.initialize(runMiNiFi);
+        this.runMiNiFi = runMiNiFi;
+        this.miNiFiConfigurationChangeListener = 
miNiFiConfigurationChangeListener;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    public Optional<MiNiFiCommandState> handleUpdate() {
+        logger.debug("Handling configuration update");
+        Optional<MiNiFiCommandState> commandState = Optional.empty();
+        try (FileInputStream configFile = new 
FileInputStream(getConfigFilePath().toFile())) {
+            ByteBuffer readOnlyNewConfig = 
ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
+                    IOUtils.toByteArray(configFile), 
runMiNiFi.getConfigFileReference().get().duplicate(), 
bootstrapFileProvider.getBootstrapProperties());
+            if (differentiator.isNew(readOnlyNewConfig)) {
+                miNiFiConfigurationChangeListener.handleChange(new 
ByteBufferInputStream(readOnlyNewConfig.duplicate()));
+            } else {
+                logger.info("The given configuration does not contain any 
update. No operation required");
+                commandState = Optional.of(MiNiFiCommandState.NO_OPERATION);
+            }
+        } catch (Exception e) {
+            commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED);
+            logger.error("Could not handle configuration update", e);
+        }
+        return commandState;
+    }
+
+    private Path getConfigFilePath() {
+        return ofNullable(safeGetPropertiesFilePath())
+            .map(File::new)
+            .map(File::getParent)
+            .map(parentDir -> new File(parentDir + UPDATED_CONFIG_FILE_NAME))
+            .orElse(new File("./conf/" + UPDATED_CONFIG_FILE_NAME)).toPath();

Review Comment:
   It's just a fallback here, I've extracted it to a static variable.



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> 
c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
-            
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
+                
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> 
requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new 
LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {

Review Comment:
   I think priorityque would not add any value here. LinkedList also implements 
the queue methods, and we use only that. Therefore I would keep this as it is 
if you don't mind.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig 
generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> 
operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for 
Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from 
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask();
+            String command = c2Operation.getOperation().name() + 
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+            bootstrapCommunicator.sendCommand(command, 
objectMapper.writeValueAsString(c2Operation));
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask() {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Does not received acknowledge from bootstrap 
after {} seconds. Handling remaining operations.", 
MINIFI_RESTART_TIMEOUT_SECONDS);
+                handleOngoingOperations(requestedOperationDAO.get());
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping 
acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = 
requestedOperationDAO.get();
+        ackReceived = true;
+        optionalOperationQueue.ifPresent(operationQueue -> {
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            OperationState state = OperationState.valueOf(params[0]);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (state != OperationState.FULLY_APPLIED) {

Review Comment:
   added a comment



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> 
c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
-            
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
+                
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> 
requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new 
LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {
+            Optional<C2OperationHandler> c2OperationHandler = 
operationService.getHandlerForOperation(requestedOperation);
+            if (c2OperationHandler.isPresent()) {
+                Optional<C2OperationAck> c2OperationAck = 
handleOperation(c2OperationHandler.get(), requestedOperation, 
lRequestedOperations);
+                if (c2OperationAck.isPresent()) {
+                    sendAcknowledge(c2OperationAck.get());
+                } else {
+                    return;

Review Comment:
   Added a comment



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -216,6 +213,46 @@ private Process restartNifi(Properties 
bootstrapProperties, String confDir, Proc
         return process;
     }
 
+    private boolean revertFlowConfig(Properties bootstrapProperties, String 
confDir, File swapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change 
configuration. Reverting to old configuration.");
+
+        try {
+            ByteBuffer tempConfigFile = 
generateConfigFiles(Files.newInputStream(swapConfigFile.toPath()), confDir, 
bootstrapProperties);
+            
runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The swap file is malformed, unable to 
restart from prior state. Will not attempt to restart MiNiFi. Swap File should 
be cleaned up manually.");
+            return false;
+        }
+
+        Files.copy(swapConfigFile.toPath(), 
Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), 
REPLACE_EXISTING);
+
+        DEFAULT_LOGGER.info("Replacing config file with swap file and deleting 
swap file");
+        if (!swapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The swap file failed to delete after 
replacing using it to revert to the old configuration. It should be cleaned up 
manually.");
+        }
+        runMiNiFi.setReloading(false);
+        return true;
+    }
+
+    private boolean revertBootstrapConfig(String confDir, File 
bootstrapSwapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Bootstrap Swap file exists, MiNiFi failed trying 
to change configuration. Reverting to old configuration.");
+
+        Files.copy(bootstrapSwapConfigFile.toPath(), 
bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+        try {
+            ByteBuffer tempConfigFile = 
generateConfigFiles(asByteArrayInputStream(runMiNiFi.getConfigFileReference().get().duplicate()),
 confDir, bootstrapFileProvider.getBootstrapProperties());
+            
runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The swap file is malformed, unable to 
restart from prior state. Will not attempt to restart MiNiFi. Swap File should 
be cleaned up manually.");
+            return false;
+        }
+
+        if (!bootstrapSwapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The swap file failed to delete after 
replacing using it to revert to the old configuration. It should be cleaned up 
manually.");

Review Comment:
   added the swap file types into the logs. The 2 methods revertFlowConfig and 
revertBootstrapConfig are slightly different so extracting couply lines into 
common method would not make it cleaner in my opinion.



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static 
org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FileBasedRequestedOperationDAOTest {
+
+    @Mock
+    private ObjectMapper objectMapper;
+
+    @TempDir
+    File tmpDir;
+
+    private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+
+    @BeforeEach
+    void setup() {
+        fileBasedRequestedOperationDAO = new 
FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+    }
+
+    @Test
+    void shouldSaveRequestedOperationsToFile() throws IOException {
+        OperationQueue operationQueue = getOperationQueue();
+        fileBasedRequestedOperationDAO.save(operationQueue);
+
+        verify(objectMapper).writeValue(any(File.class), eq(operationQueue));
+    }
+    @Test
+    void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws 
IOException {
+        doThrow(new 
RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList());
+
+        assertThrows(RuntimeException.class, () -> 
fileBasedRequestedOperationDAO.save(mock(OperationQueue.class)));
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenFileDoesntExists() {
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenExceptionHappens() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + 
REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        doThrow(new 
RuntimeException()).when(objectMapper).readValue(any(File.class), 
eq(OperationQueue.class));
+
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+    }
+
+    @Test
+    void shouldGetRequestedOperations() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + 
REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        OperationQueue operationQueue = getOperationQueue();
+        when(objectMapper.readValue(any(File.class), 
eq(OperationQueue.class))).thenReturn(operationQueue);
+
+        assertEquals(Optional.of(operationQueue), 
fileBasedRequestedOperationDAO.get());
+    }
+

Review Comment:
   I would keep this approach, your suggestion would mean that we use the real 
implementation of the objectmapper what I would avoid.. 



##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider operationService;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory 
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> 
c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
         this.operationService = operationService;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
         try {
-            C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
-            
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            if (heartbeatLocked) {
+                logger.debug("Restart is in progress, skipping heartbeat");
+            } else {
+                C2Heartbeat c2Heartbeat = 
c2HeartbeatFactory.create(runtimeInfoWrapper);
+                
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            }
         } catch (Exception e) {
             logger.error("Failed to send/process heartbeat:", e);
         }
     }
 
+    public void sendAcknowledge(C2OperationAck operationAck) {
+        try {
+            client.acknowledgeOperation(operationAck);
+        } catch (Exception e) {
+            logger.error("Failed to send acknowledge:", e);
+        }
+    }
+
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> 
requestedOperations) {
+        LinkedList<C2Operation> lRequestedOperations = new 
LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = lRequestedOperations.poll()) != null) {
+            Optional<C2OperationHandler> c2OperationHandler = 
operationService.getHandlerForOperation(requestedOperation);

Review Comment:
   Moved the logging, but need to keep the isPresent because of the "return"



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig 
generateClientConfig(NiFiProperties properties) {
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations(requestedOperationDAO.get());
+        heartbeatExecutorService.scheduleAtFixedRate(() -> 
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, 
heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleOngoingOperations(Optional<OperationQueue> 
operationQueue) {
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            int sleep = 1000;
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for 
Acknowledge");
+            }
+            currentWaitTime += sleep;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from 
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }

Review Comment:
   It's logged in the acknowledge handler so not needed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to