ferencerdei commented on code in PR #6733: URL: https://github.com/apache/nifi/pull/6733#discussion_r1062251850
########## c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service.operation; + +import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES; +import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class UpdatePropertiesOperationHandlerTest { + + protected static final String ID = "id"; Review Comment: no reason, changed them. ########## c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java: ########## @@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response) { } } - private void handleRequestedOperations(List<C2Operation> requestedOperations) { - for (C2Operation requestedOperation : requestedOperations) { - operationService.handleOperation(requestedOperation) - .ifPresent(client::acknowledgeOperation); + private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) { + return c2OperationHandler.requiresRestart() + && !Optional.ofNullable(c2OperationAck) Review Comment: isEmpty is added only in java11, I'm not sure if we are ready to use it, but I moved it to a separate method. ########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java: ########## @@ -36,6 +36,8 @@ public class C2NiFiProperties { public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + "rest.connectionTimeout"; public static final String C2_READ_TIMEOUT = C2_PREFIX + "rest.readTimeout"; public static final String C2_CALL_TIMEOUT = C2_PREFIX + "rest.callTimeout"; + public static final String C2_MAX_IDLE_CONNECTIONS = C2_PREFIX + "rest.maxIdleConnections"; Review Comment: Merged them. ########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java: ########## @@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) { } public void start() { - scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + } + + private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) { + LOGGER.info("Handling ongoing operations: {}", operationQueue); + if (operationQueue.isPresent()) { + try { + waitForAcknowledgeFromBootstrap(); + c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations()); + } catch (Exception e) { + LOGGER.error("Failed to process c2 operations queue", e); + c2ClientService.enableHeartbeat(); + } + } else { + c2ClientService.enableHeartbeat(); + } + } + + private void waitForAcknowledgeFromBootstrap() { + LOGGER.info("Waiting for ACK signal from Bootstrap"); + int currentWaitTime = 0; + while(!ackReceived) { + int sleep = 1000; + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted while waiting for Acknowledge"); + } + currentWaitTime += sleep; + if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) { + LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS); + break; + } + } + } + + private void registerOperation(C2Operation c2Operation) { + try { + ackReceived = false; + registerAcknowledgeTimeoutTask(); + String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : ""); + bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation)); + } catch (IOException e) { + LOGGER.error("Failed to send operation to bootstrap", e); + throw new UncheckedIOException(e); + } + } + + private void registerAcknowledgeTimeoutTask() { + bootstrapAcknowledgeExecutorService.schedule(() -> { Review Comment: This method was also added because of unexpected communication issues between bootstrap and the minifi process. I'm not sure if it's needed to apply more defensive programming here (but I'm open if you have any idea). ########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java: ########## @@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) { } public void start() { - scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + } + + private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) { + LOGGER.info("Handling ongoing operations: {}", operationQueue); + if (operationQueue.isPresent()) { + try { + waitForAcknowledgeFromBootstrap(); + c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations()); + } catch (Exception e) { + LOGGER.error("Failed to process c2 operations queue", e); + c2ClientService.enableHeartbeat(); + } + } else { + c2ClientService.enableHeartbeat(); + } + } + + private void waitForAcknowledgeFromBootstrap() { + LOGGER.info("Waiting for ACK signal from Bootstrap"); + int currentWaitTime = 0; + while(!ackReceived) { + int sleep = 1000; + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted while waiting for Acknowledge"); + } + currentWaitTime += sleep; + if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) { + LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS); + break; + } + } + } + + private void registerOperation(C2Operation c2Operation) { + try { + ackReceived = false; + registerAcknowledgeTimeoutTask(); + String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : ""); + bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation)); + } catch (IOException e) { + LOGGER.error("Failed to send operation to bootstrap", e); + throw new UncheckedIOException(e); + } + } + + private void registerAcknowledgeTimeoutTask() { + bootstrapAcknowledgeExecutorService.schedule(() -> { + if (!ackReceived) { + LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + } + }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + private void acknowledgeHandler(String[] params) { + LOGGER.info("Received acknowledge message from bootstrap process"); + if (params.length < 1) { + LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation"); + return; + } + + Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get(); + ackReceived = true; + optionalOperationQueue.ifPresent(operationQueue -> { + C2Operation c2Operation = operationQueue.getCurrentOperation(); + C2OperationAck c2OperationAck = new C2OperationAck(); + c2OperationAck.setOperationId(c2Operation.getIdentifier()); + C2OperationState c2OperationState = new C2OperationState(); + OperationState state = OperationState.valueOf(params[0]); + c2OperationState.setState(state); + c2OperationAck.setOperationState(c2OperationState); + c2ClientService.sendAcknowledge(c2OperationAck); + if (state != OperationState.FULLY_APPLIED) { + handleOngoingOperations(optionalOperationQueue); Review Comment: I can add more state to MiNiFiCommandState so we can handle these differences. ########## c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java: ########## @@ -48,6 +48,14 @@ public interface C2OperationHandler { */ Map<String, Object> getProperties(); + /** + * Determines if the given operation requires to restart the MiNiFi process + * @return true if it requires restart, false otherwise + */ + default boolean requiresRestart() { Review Comment: By client side what do you mean? It really depends on what the given handler modifies. Eg.: if a configuration file is modified it can't be applied without restarting. It also requires some internal Nifi knowledge about how bootstrapping works. For me, this was the most straightforward way to handle this, so if we add new operations we don't need to modify other parts of the code. ########## minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.bootstrap.service; + +import static java.util.Optional.ofNullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.Optional; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; +import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator; +import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateConfigurationService { + + private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationService.class); + private static final String UPDATED_CONFIG_FILE_NAME = "config-updated.yml"; + + private final Differentiator<ByteBuffer> differentiator; + private final RunMiNiFi runMiNiFi; + private final ConfigurationChangeListener miNiFiConfigurationChangeListener; + private final BootstrapFileProvider bootstrapFileProvider; + + public UpdateConfigurationService(RunMiNiFi runMiNiFi, ConfigurationChangeListener miNiFiConfigurationChangeListener, BootstrapFileProvider bootstrapFileProvider) { + this.differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator(); + this.differentiator.initialize(runMiNiFi); + this.runMiNiFi = runMiNiFi; + this.miNiFiConfigurationChangeListener = miNiFiConfigurationChangeListener; + this.bootstrapFileProvider = bootstrapFileProvider; + } + + public Optional<MiNiFiCommandState> handleUpdate() { + logger.debug("Handling configuration update"); + Optional<MiNiFiCommandState> commandState = Optional.empty(); + try (FileInputStream configFile = new FileInputStream(getConfigFilePath().toFile())) { + ByteBuffer readOnlyNewConfig = ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema( + IOUtils.toByteArray(configFile), runMiNiFi.getConfigFileReference().get().duplicate(), bootstrapFileProvider.getBootstrapProperties()); + if (differentiator.isNew(readOnlyNewConfig)) { + miNiFiConfigurationChangeListener.handleChange(new ByteBufferInputStream(readOnlyNewConfig.duplicate())); + } else { + logger.info("The given configuration does not contain any update. No operation required"); + commandState = Optional.of(MiNiFiCommandState.NO_OPERATION); + } + } catch (Exception e) { + commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED); + logger.error("Could not handle configuration update", e); + } + return commandState; + } + + private Path getConfigFilePath() { + return ofNullable(safeGetPropertiesFilePath()) + .map(File::new) + .map(File::getParent) + .map(parentDir -> new File(parentDir + UPDATED_CONFIG_FILE_NAME)) + .orElse(new File("./conf/" + UPDATED_CONFIG_FILE_NAME)).toPath(); Review Comment: It's just a fallback here, I've extracted it to a static variable. ########## c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java: ########## @@ -32,23 +42,65 @@ public class C2ClientService { private final C2Client client; private final C2HeartbeatFactory c2HeartbeatFactory; - private final C2OperationService operationService; + private final C2OperationHandlerProvider operationService; + private final RequestedOperationDAO requestedOperationDAO; + private final Consumer<C2Operation> c2OperationRegister; + private volatile boolean heartbeatLocked = false; - public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) { + public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService, + RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) { this.client = client; this.c2HeartbeatFactory = c2HeartbeatFactory; this.operationService = operationService; + this.requestedOperationDAO = requestedOperationDAO; + this.c2OperationRegister = c2OperationRegister; } public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) { try { - C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); - client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); + if (heartbeatLocked) { + logger.debug("Restart is in progress, skipping heartbeat"); + } else { + C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); + client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); + } } catch (Exception e) { logger.error("Failed to send/process heartbeat:", e); } } + public void sendAcknowledge(C2OperationAck operationAck) { + try { + client.acknowledgeOperation(operationAck); + } catch (Exception e) { + logger.error("Failed to send acknowledge:", e); + } + } + + public void enableHeartbeat() { + heartbeatLocked = false; + } + + public void handleRequestedOperations(List<C2Operation> requestedOperations) { + LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations); + C2Operation requestedOperation; + while ((requestedOperation = lRequestedOperations.poll()) != null) { Review Comment: I think priorityque would not add any value here. LinkedList also implements the queue methods, and we use only that. Therefore I would keep this as it is if you don't mind. ########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java: ########## @@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) { } public void start() { - scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + } + + private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) { + LOGGER.info("Handling ongoing operations: {}", operationQueue); + if (operationQueue.isPresent()) { + try { + waitForAcknowledgeFromBootstrap(); + c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations()); + } catch (Exception e) { + LOGGER.error("Failed to process c2 operations queue", e); + c2ClientService.enableHeartbeat(); + } + } else { + c2ClientService.enableHeartbeat(); + } + } + + private void waitForAcknowledgeFromBootstrap() { + LOGGER.info("Waiting for ACK signal from Bootstrap"); + int currentWaitTime = 0; + while(!ackReceived) { + int sleep = 1000; + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted while waiting for Acknowledge"); + } + currentWaitTime += sleep; + if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) { + LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS); + break; + } + } + } + + private void registerOperation(C2Operation c2Operation) { + try { + ackReceived = false; + registerAcknowledgeTimeoutTask(); + String command = c2Operation.getOperation().name() + (c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : ""); + bootstrapCommunicator.sendCommand(command, objectMapper.writeValueAsString(c2Operation)); + } catch (IOException e) { + LOGGER.error("Failed to send operation to bootstrap", e); + throw new UncheckedIOException(e); + } + } + + private void registerAcknowledgeTimeoutTask() { + bootstrapAcknowledgeExecutorService.schedule(() -> { + if (!ackReceived) { + LOGGER.info("Does not received acknowledge from bootstrap after {} seconds. Handling remaining operations.", MINIFI_RESTART_TIMEOUT_SECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + } + }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + private void acknowledgeHandler(String[] params) { + LOGGER.info("Received acknowledge message from bootstrap process"); + if (params.length < 1) { + LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation"); + return; + } + + Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.get(); + ackReceived = true; + optionalOperationQueue.ifPresent(operationQueue -> { + C2Operation c2Operation = operationQueue.getCurrentOperation(); + C2OperationAck c2OperationAck = new C2OperationAck(); + c2OperationAck.setOperationId(c2Operation.getIdentifier()); + C2OperationState c2OperationState = new C2OperationState(); + OperationState state = OperationState.valueOf(params[0]); + c2OperationState.setState(state); + c2OperationAck.setOperationState(c2OperationState); + c2ClientService.sendAcknowledge(c2OperationAck); + if (state != OperationState.FULLY_APPLIED) { Review Comment: added a comment ########## c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java: ########## @@ -32,23 +42,65 @@ public class C2ClientService { private final C2Client client; private final C2HeartbeatFactory c2HeartbeatFactory; - private final C2OperationService operationService; + private final C2OperationHandlerProvider operationService; + private final RequestedOperationDAO requestedOperationDAO; + private final Consumer<C2Operation> c2OperationRegister; + private volatile boolean heartbeatLocked = false; - public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) { + public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService, + RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) { this.client = client; this.c2HeartbeatFactory = c2HeartbeatFactory; this.operationService = operationService; + this.requestedOperationDAO = requestedOperationDAO; + this.c2OperationRegister = c2OperationRegister; } public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) { try { - C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); - client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); + if (heartbeatLocked) { + logger.debug("Restart is in progress, skipping heartbeat"); + } else { + C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); + client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); + } } catch (Exception e) { logger.error("Failed to send/process heartbeat:", e); } } + public void sendAcknowledge(C2OperationAck operationAck) { + try { + client.acknowledgeOperation(operationAck); + } catch (Exception e) { + logger.error("Failed to send acknowledge:", e); + } + } + + public void enableHeartbeat() { + heartbeatLocked = false; + } + + public void handleRequestedOperations(List<C2Operation> requestedOperations) { + LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations); + C2Operation requestedOperation; + while ((requestedOperation = lRequestedOperations.poll()) != null) { + Optional<C2OperationHandler> c2OperationHandler = operationService.getHandlerForOperation(requestedOperation); + if (c2OperationHandler.isPresent()) { + Optional<C2OperationAck> c2OperationAck = handleOperation(c2OperationHandler.get(), requestedOperation, lRequestedOperations); + if (c2OperationAck.isPresent()) { + sendAcknowledge(c2OperationAck.get()); + } else { + return; Review Comment: Added a comment ########## minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java: ########## @@ -216,6 +213,46 @@ private Process restartNifi(Properties bootstrapProperties, String confDir, Proc return process; } + private boolean revertFlowConfig(Properties bootstrapProperties, String confDir, File swapConfigFile) throws IOException { + DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration."); + + try { + ByteBuffer tempConfigFile = generateConfigFiles(Files.newInputStream(swapConfigFile.toPath()), confDir, bootstrapProperties); + runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer()); + } catch (ConfigurationChangeException e) { + DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually."); + return false; + } + + Files.copy(swapConfigFile.toPath(), Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING); + + DEFAULT_LOGGER.info("Replacing config file with swap file and deleting swap file"); + if (!swapConfigFile.delete()) { + DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually."); + } + runMiNiFi.setReloading(false); + return true; + } + + private boolean revertBootstrapConfig(String confDir, File bootstrapSwapConfigFile) throws IOException { + DEFAULT_LOGGER.info("Bootstrap Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration."); + + Files.copy(bootstrapSwapConfigFile.toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING); + try { + ByteBuffer tempConfigFile = generateConfigFiles(asByteArrayInputStream(runMiNiFi.getConfigFileReference().get().duplicate()), confDir, bootstrapFileProvider.getBootstrapProperties()); + runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer()); + } catch (ConfigurationChangeException e) { + DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually."); + return false; + } + + if (!bootstrapSwapConfigFile.delete()) { + DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually."); Review Comment: added the swap file types into the logs. The 2 methods revertFlowConfig and revertBootstrapConfig are slightly different so extracting couply lines into common method would not make it cleaner in my opinion. ########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.c2; + +import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; +import org.apache.nifi.c2.client.service.operation.OperationQueue; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.OperandType; +import org.apache.nifi.c2.protocol.api.OperationType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class FileBasedRequestedOperationDAOTest { + + @Mock + private ObjectMapper objectMapper; + + @TempDir + File tmpDir; + + private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO; + + @BeforeEach + void setup() { + fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper); + } + + @Test + void shouldSaveRequestedOperationsToFile() throws IOException { + OperationQueue operationQueue = getOperationQueue(); + fileBasedRequestedOperationDAO.save(operationQueue); + + verify(objectMapper).writeValue(any(File.class), eq(operationQueue)); + } + @Test + void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws IOException { + doThrow(new RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList()); + + assertThrows(RuntimeException.class, () -> fileBasedRequestedOperationDAO.save(mock(OperationQueue.class))); + } + + @Test + void shouldGetReturnEmptyWhenFileDoesntExists() { + assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get()); + } + + @Test + void shouldGetReturnEmptyWhenExceptionHappens() throws IOException { + new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile(); + + doThrow(new RuntimeException()).when(objectMapper).readValue(any(File.class), eq(OperationQueue.class)); + + assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get()); + } + + @Test + void shouldGetRequestedOperations() throws IOException { + new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile(); + + OperationQueue operationQueue = getOperationQueue(); + when(objectMapper.readValue(any(File.class), eq(OperationQueue.class))).thenReturn(operationQueue); + + assertEquals(Optional.of(operationQueue), fileBasedRequestedOperationDAO.get()); + } + Review Comment: I would keep this approach, your suggestion would mean that we use the real implementation of the objectmapper what I would avoid.. ########## c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java: ########## @@ -32,23 +42,65 @@ public class C2ClientService { private final C2Client client; private final C2HeartbeatFactory c2HeartbeatFactory; - private final C2OperationService operationService; + private final C2OperationHandlerProvider operationService; + private final RequestedOperationDAO requestedOperationDAO; + private final Consumer<C2Operation> c2OperationRegister; + private volatile boolean heartbeatLocked = false; - public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) { + public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider operationService, + RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) { this.client = client; this.c2HeartbeatFactory = c2HeartbeatFactory; this.operationService = operationService; + this.requestedOperationDAO = requestedOperationDAO; + this.c2OperationRegister = c2OperationRegister; } public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) { try { - C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); - client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); + if (heartbeatLocked) { + logger.debug("Restart is in progress, skipping heartbeat"); + } else { + C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); + client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); + } } catch (Exception e) { logger.error("Failed to send/process heartbeat:", e); } } + public void sendAcknowledge(C2OperationAck operationAck) { + try { + client.acknowledgeOperation(operationAck); + } catch (Exception e) { + logger.error("Failed to send acknowledge:", e); + } + } + + public void enableHeartbeat() { + heartbeatLocked = false; + } + + public void handleRequestedOperations(List<C2Operation> requestedOperations) { + LinkedList<C2Operation> lRequestedOperations = new LinkedList<>(requestedOperations); + C2Operation requestedOperation; + while ((requestedOperation = lRequestedOperations.poll()) != null) { + Optional<C2OperationHandler> c2OperationHandler = operationService.getHandlerForOperation(requestedOperation); Review Comment: Moved the logging, but need to keep the isPresent because of the "return" ########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java: ########## @@ -152,15 +185,94 @@ private C2ClientConfig generateClientConfig(NiFiProperties properties) { } public void start() { - scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + handleOngoingOperations(requestedOperationDAO.get()); + heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); + } + + private synchronized void handleOngoingOperations(Optional<OperationQueue> operationQueue) { + LOGGER.info("Handling ongoing operations: {}", operationQueue); + if (operationQueue.isPresent()) { + try { + waitForAcknowledgeFromBootstrap(); + c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations()); + } catch (Exception e) { + LOGGER.error("Failed to process c2 operations queue", e); + c2ClientService.enableHeartbeat(); + } + } else { + c2ClientService.enableHeartbeat(); + } + } + + private void waitForAcknowledgeFromBootstrap() { + LOGGER.info("Waiting for ACK signal from Bootstrap"); + int currentWaitTime = 0; + while(!ackReceived) { + int sleep = 1000; + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted while waiting for Acknowledge"); + } + currentWaitTime += sleep; + if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) { + LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS); + break; + } + } Review Comment: It's logged in the acknowledge handler so not needed here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org