This is an automated email from the ASF dual-hosted git repository. bejancsaba pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 52e257e16c NIFI-13123 MiNiFi async C2 operation processing 52e257e16c is described below commit 52e257e16c6d86cde377ba0063ac268fd9ac6129 Author: Ferenc Kis <briansolo1...@gmail.com> AuthorDate: Thu May 2 16:03:24 2024 +0200 NIFI-13123 MiNiFi async C2 operation processing Signed-off-by: Csaba Bejan <bejan.cs...@gmail.com> This closes #8738. --- .../org/apache/nifi/c2/client/C2ClientConfig.java | 12 + .../nifi/c2/client/service/C2ClientService.java | 149 ----------- .../nifi/c2/client/service/C2HeartbeatFactory.java | 98 ++++---- .../nifi/c2/client/service/C2HeartbeatManager.java | 79 ++++++ .../nifi/c2/client/service/C2OperationManager.java | 204 ++++++++++++++++ ...tionDAO.java => C2OperationRestartHandler.java} | 26 +- .../client/service/operation/OperationQueue.java | 17 +- ...tedOperationDAO.java => OperationQueueDAO.java} | 2 +- .../c2/client/service/C2ClientServiceTest.java | 271 --------------------- .../c2/client/service/C2HeartbeatManagerTest.java | 141 +++++++++++ .../c2/client/service/C2OperationManagerTest.java | 248 +++++++++++++++++++ .../nifi/minifi/commons/api/MiNiFiProperties.java | 1 + .../c2/BootstrapC2OperationRestartHandler.java | 120 +++++++++ .../apache/nifi/minifi/c2/C2NifiClientService.java | 204 +++++----------- ...ionDAO.java => FileBasedOperationQueueDAO.java} | 13 +- .../c2/BootstrapC2OperationRestartHandlerTest.java | 170 +++++++++++++ ...st.java => FileBasedOperationQueueDAOTest.java} | 11 +- .../nifi/minifi/bootstrap/BootstrapListener.java | 17 +- .../nifi/bootstrap/BootstrapCommunicator.java | 6 +- .../org/apache/nifi/bootstrap/CommandResult.java | 30 +-- 20 files changed, 1139 insertions(+), 680 deletions(-) diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java index 3171280f99..d9cd3aa90f 100644 --- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java @@ -57,6 +57,7 @@ public class C2ClientConfig { private final long keepAliveDuration; private final String c2RequestCompression; private final String c2AssetDirectory; + private final long bootstrapAcknowledgeTimeout; private C2ClientConfig(final Builder builder) { this.c2Url = builder.c2Url; @@ -86,6 +87,7 @@ public class C2ClientConfig { this.keepAliveDuration = builder.keepAliveDuration; this.c2RequestCompression = builder.c2RequestCompression; this.c2AssetDirectory = builder.c2AssetDirectory; + this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout; } public String getC2Url() { @@ -196,6 +198,10 @@ public class C2ClientConfig { return keepAliveDuration; } + public long getBootstrapAcknowledgeTimeout() { + return bootstrapAcknowledgeTimeout; + } + /** * Builder for client configuration. */ @@ -231,6 +237,7 @@ public class C2ClientConfig { private long keepAliveDuration; private String c2RequestCompression; private String c2AssetDirectory; + private long bootstrapAcknowledgeTimeout; public Builder c2Url(String c2Url) { this.c2Url = c2Url; @@ -377,6 +384,11 @@ public class C2ClientConfig { return this; } + public Builder bootstrapAcknowledgeTimeout(long bootstrapAcknowledgeTimeout) { + this.bootstrapAcknowledgeTimeout = bootstrapAcknowledgeTimeout; + return this; + } + public C2ClientConfig build() { return new C2ClientConfig(this); } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java deleted file mode 100644 index e8f0718cbd..0000000000 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.c2.client.service; - -import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.function.Consumer; -import org.apache.nifi.c2.client.api.C2Client; -import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; -import org.apache.nifi.c2.client.service.operation.C2OperationHandler; -import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider; -import org.apache.nifi.c2.client.service.operation.OperationQueue; -import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO; -import org.apache.nifi.c2.protocol.api.C2Heartbeat; -import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; -import org.apache.nifi.c2.protocol.api.C2Operation; -import org.apache.nifi.c2.protocol.api.C2OperationAck; -import org.apache.nifi.c2.protocol.api.C2OperationState; -import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class C2ClientService { - - private static final Logger logger = LoggerFactory.getLogger(C2ClientService.class); - - private final C2Client client; - private final C2HeartbeatFactory c2HeartbeatFactory; - private final C2OperationHandlerProvider c2OperationHandlerProvider; - private final RequestedOperationDAO requestedOperationDAO; - private final Consumer<C2Operation> c2OperationRegister; - private volatile boolean heartbeatLocked = false; - - public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider c2OperationHandlerProvider, - RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) { - this.client = client; - this.c2HeartbeatFactory = c2HeartbeatFactory; - this.c2OperationHandlerProvider = c2OperationHandlerProvider; - this.requestedOperationDAO = requestedOperationDAO; - this.c2OperationRegister = c2OperationRegister; - } - - public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) { - if (heartbeatLocked) { - logger.debug("Heartbeats are locked, skipping sending for now"); - } else { - try { - C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); - client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); - } catch (Exception e) { - logger.error("Failed to send/process heartbeat:", e); - } - } - } - - public void sendAcknowledge(C2OperationAck operationAck) { - try { - client.acknowledgeOperation(operationAck); - } catch (Exception e) { - logger.error("Failed to send acknowledge:", e); - } - } - - public void enableHeartbeat() { - heartbeatLocked = false; - } - - private void disableHeartbeat() { - heartbeatLocked = true; - } - - public void handleRequestedOperations(List<C2Operation> requestedOperations) { - LinkedList<C2Operation> c2Operations = new LinkedList<>(requestedOperations); - C2Operation requestedOperation; - while ((requestedOperation = c2Operations.poll()) != null) { - Optional<C2OperationHandler> c2OperationHandler = c2OperationHandlerProvider.getHandlerForOperation(requestedOperation); - if (!c2OperationHandler.isPresent()) { - continue; - } - C2OperationHandler operationHandler = c2OperationHandler.get(); - C2OperationAck c2OperationAck = operationHandler.handle(requestedOperation); - if (requiresRestart(operationHandler, c2OperationAck)) { - if (initiateRestart(c2Operations, requestedOperation)) { - return; - } - C2OperationState c2OperationState = new C2OperationState(); - c2OperationState.setState(OperationState.NOT_APPLIED); - c2OperationAck.setOperationState(c2OperationState); - } - sendAcknowledge(c2OperationAck); - } - enableHeartbeat(); - requestedOperationDAO.cleanup(); - } - - private void processResponse(C2HeartbeatResponse response) { - List<C2Operation> requestedOperations = response.getRequestedOperations(); - if (requestedOperations != null && !requestedOperations.isEmpty()) { - logger.info("Received {} operations from the C2 server", requestedOperations.size()); - handleRequestedOperations(requestedOperations); - } else { - logger.trace("No operations received from the C2 server in the server. Nothing to do."); - } - } - - private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) { - return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck); - } - - private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) { - return Optional.ofNullable(c2OperationAck) - .map(C2OperationAck::getOperationState) - .map(C2OperationState::getState) - .filter(FULLY_APPLIED::equals) - .isPresent(); - } - - private boolean initiateRestart(LinkedList<C2Operation> requestedOperations, C2Operation requestedOperation) { - try { - disableHeartbeat(); - requestedOperationDAO.save(new OperationQueue(requestedOperation, requestedOperations)); - c2OperationRegister.accept(requestedOperation); - return true; - } catch (Exception e) { - logger.error("Failed to initiate restart. Dropping operation and continue with remaining operations", e); - requestedOperationDAO.cleanup(); - } - return false; - } - -} - diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java index c939ab78ab..b5d9e9824e 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java @@ -17,6 +17,13 @@ package org.apache.nifi.c2.client.service; +import static java.net.NetworkInterface.getNetworkInterfaces; +import static java.util.Collections.list; +import static java.util.Comparator.comparing; +import static java.util.Comparator.comparingInt; +import static java.util.Map.entry; +import static java.util.Objects.nonNull; +import static java.util.stream.Collectors.toSet; import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.io.File; @@ -25,13 +32,12 @@ import java.lang.management.OperatingSystemMXBean; import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; +import java.net.SocketException; import java.util.Collections; -import java.util.Enumeration; -import java.util.HashSet; +import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import org.apache.nifi.c2.client.C2ClientConfig; import org.apache.nifi.c2.client.PersistentUuidGenerator; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; @@ -137,8 +143,7 @@ public class C2HeartbeatFactory { } private DeviceInfo generateDeviceInfo() { - // Populate DeviceInfo - final DeviceInfo deviceInfo = new DeviceInfo(); + DeviceInfo deviceInfo = new DeviceInfo(); deviceInfo.setNetworkInfo(generateNetworkInfo()); deviceInfo.setIdentifier(getDeviceIdentifier(deviceInfo.getNetworkInfo())); deviceInfo.setSystemInfo(generateSystemInfo()); @@ -146,67 +151,63 @@ public class C2HeartbeatFactory { } private NetworkInfo generateNetworkInfo() { - NetworkInfo networkInfo = new NetworkInfo(); try { - // Determine all interfaces - final Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); - - final Set<NetworkInterface> operationIfaces = new HashSet<>(); - - // Determine eligible interfaces - while (networkInterfaces.hasMoreElements()) { - final NetworkInterface networkInterface = networkInterfaces.nextElement(); - if (!networkInterface.isLoopback() && networkInterface.isUp()) { - operationIfaces.add(networkInterface); - } + Set<NetworkInterface> eligibleInterfaces = list(getNetworkInterfaces()) + .stream() + .filter(this::isEligibleInterface) + .collect(toSet()); + + if (logger.isTraceEnabled()) { + logger.trace("Found {} eligible interfaces with names {}", eligibleInterfaces.size(), + eligibleInterfaces.stream() + .map(NetworkInterface::getName) + .collect(toSet()) + ); } - logger.trace("Have {} interfaces with names {}", operationIfaces.size(), - operationIfaces.stream() - .map(NetworkInterface::getName) - .collect(Collectors.toSet()) - ); - - if (!operationIfaces.isEmpty()) { - if (operationIfaces.size() > 1) { - logger.debug("Instance has multiple interfaces. Generated information may be non-deterministic."); - } - boolean networkInfoUnset = true; - for (NetworkInterface networkInterface : operationIfaces) { - Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses(); - while (inetAddresses.hasMoreElements()) { - InetAddress inetAddress = inetAddresses.nextElement(); - // IPv4 address is preferred over IPv6 as it provides more readable information for the user - if (inetAddress instanceof Inet4Address) { - updateNetworkInfo(networkInfo, networkInterface, inetAddress); - return networkInfo; - } - if (networkInfoUnset) { - updateNetworkInfo(networkInfo, networkInterface, inetAddress); - networkInfoUnset = false; - } - } - } - } + Comparator<Map.Entry<NetworkInterface, InetAddress>> orderByIp4AddressesFirst = comparingInt(item -> item.getValue() instanceof Inet4Address ? 0 : 1); + Comparator<Map.Entry<NetworkInterface, InetAddress>> orderByNetworkInterfaceName = comparing(entry -> entry.getKey().getName()); + return eligibleInterfaces.stream() + .flatMap(networkInterface -> list(networkInterface.getInetAddresses()) + .stream() + .map(inetAddress -> entry(networkInterface, inetAddress))) + .sorted(orderByIp4AddressesFirst.thenComparing(orderByNetworkInterfaceName)) + .findFirst() + .map(entry -> createNetworkInfo(entry.getKey(), entry.getValue())) + .orElseGet(NetworkInfo::new); } catch (Exception e) { logger.error("Network Interface processing failed", e); + return new NetworkInfo(); } - return networkInfo; } - private void updateNetworkInfo(NetworkInfo networkInfo, NetworkInterface networkInterface, InetAddress inetAddress) { + private boolean isEligibleInterface(NetworkInterface networkInterface) { + try { + return !networkInterface.isLoopback() + && !networkInterface.isVirtual() + && networkInterface.isUp() + && nonNull(networkInterface.getHardwareAddress()); + } catch (SocketException e) { + logger.warn("Error processing network interface", e); + return false; + } + } + + private NetworkInfo createNetworkInfo(NetworkInterface networkInterface, InetAddress inetAddress) { + NetworkInfo networkInfo = new NetworkInfo(); networkInfo.setDeviceId(networkInterface.getName()); networkInfo.setHostname(inetAddress.getHostName()); networkInfo.setIpAddress(inetAddress.getHostAddress()); + return networkInfo; } private String getDeviceIdentifier(NetworkInfo networkInfo) { if (deviceId == null) { if (networkInfo.getDeviceId() != null) { try { - final NetworkInterface netInterface = NetworkInterface.getByName(networkInfo.getDeviceId()); + NetworkInterface netInterface = NetworkInterface.getByName(networkInfo.getDeviceId()); byte[] hardwareAddress = netInterface.getHardwareAddress(); - final StringBuilder macBuilder = new StringBuilder(); + StringBuilder macBuilder = new StringBuilder(); if (hardwareAddress != null) { for (byte address : hardwareAddress) { macBuilder.append(String.format("%02X", address)); @@ -221,7 +222,6 @@ public class C2HeartbeatFactory { deviceId = getConfiguredDeviceId(); } } - return deviceId; } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java new file mode 100644 index 0000000000..a3729255ce --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service; + +import static java.util.Optional.ofNullable; +import static java.util.function.Predicate.not; + +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; +import org.apache.nifi.c2.protocol.api.C2Heartbeat; +import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class C2HeartbeatManager implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(C2HeartbeatManager.class); + + private final C2Client client; + private final C2HeartbeatFactory c2HeartbeatFactory; + private final ReentrantLock heartbeatLock; + private final RuntimeInfoWrapper runtimeInfoWrapper; + private final C2OperationManager c2OperationManager; + + public C2HeartbeatManager(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, ReentrantLock heartbeatLock, RuntimeInfoWrapper runtimeInfoWrapper, + C2OperationManager c2OperationManager) { + this.client = client; + this.c2HeartbeatFactory = c2HeartbeatFactory; + this.heartbeatLock = heartbeatLock; + this.runtimeInfoWrapper = runtimeInfoWrapper; + this.c2OperationManager = c2OperationManager; + } + + @Override + public void run() { + if (!heartbeatLock.tryLock()) { + LOGGER.debug("Heartbeat lock is hold by another thread, skipping heartbeat sending"); + return; + } + try { + LOGGER.debug("Heartbeat lock is acquired, sending heartbeat"); + C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); + client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); + } catch (Exception e) { + LOGGER.error("Failed to send/process heartbeat", e); + } finally { + heartbeatLock.unlock(); + LOGGER.debug("Heartbeat unlocked lock and heartbeat is sent successfully"); + } + } + + private void processResponse(C2HeartbeatResponse response) { + ofNullable(response.getRequestedOperations()) + .filter(not(List::isEmpty)) + .ifPresentOrElse(operations -> { + LOGGER.info("Received {} operations from the C2 server", operations.size()); + operations.forEach(c2OperationManager::add); + }, + () -> LOGGER.debug("No operations received from the C2 server") + ); + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java new file mode 100644 index 0000000000..85d9d2d9a0 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service; + +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; +import static java.util.function.Predicate.not; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.client.service.operation.C2OperationHandler; +import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider; +import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler; +import org.apache.nifi.c2.client.service.operation.OperationQueue; +import org.apache.nifi.c2.client.service.operation.OperationQueueDAO; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class C2OperationManager implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(C2OperationManager.class); + + private final C2Client client; + private final C2OperationHandlerProvider c2OperationHandlerProvider; + private final ReentrantLock heartbeatLock; + private final OperationQueueDAO operationQueueDAO; + private final C2OperationRestartHandler c2OperationRestartHandler; + private final BlockingQueue<C2Operation> c2Operations; + + public C2OperationManager(C2Client client, C2OperationHandlerProvider c2OperationHandlerProvider, ReentrantLock heartbeatLock, + OperationQueueDAO operationQueueDAO, C2OperationRestartHandler c2OperationRestartHandler) { + this.client = client; + this.c2OperationHandlerProvider = c2OperationHandlerProvider; + this.heartbeatLock = heartbeatLock; + this.operationQueueDAO = operationQueueDAO; + this.c2OperationRestartHandler = c2OperationRestartHandler; + this.c2Operations = new LinkedBlockingQueue<>(); + } + + public void add(C2Operation c2Operation) { + try { + c2Operations.put(c2Operation); + } catch (InterruptedException e) { + LOGGER.warn("Thread was interrupted", e); + } + } + + @Override + public void run() { + processRestartState(); + + while (true) { + C2Operation operation; + try { + operation = c2Operations.take(); + } catch (InterruptedException e) { + LOGGER.warn("Thread was interrupted", e); + return; + } + + LOGGER.debug("Processing operation {}", operation); + C2OperationHandler operationHandler = c2OperationHandlerProvider.getHandlerForOperation(operation).orElse(null); + if (operationHandler == null) { + LOGGER.debug("No handler is present for C2 Operation {}, available handlers {}", operation, c2OperationHandlerProvider.getHandlers()); + continue; + } + + C2OperationAck c2OperationAck = operationHandler.handle(operation); + if (!requiresRestart(operationHandler, c2OperationAck)) { + LOGGER.debug("No restart is required. Sending ACK to C2 server {}", c2OperationAck); + sendAcknowledge(c2OperationAck); + continue; + } + + heartbeatLock.lock(); + LOGGER.debug("Restart is required. Heartbeats are stopped until restart is completed"); + Optional<C2OperationState> restartState = initRestart(operation); + if (!restartState.isPresent()) { + LOGGER.debug("Restart in progress, stopping C2OperationManager"); + break; + } + + try { + C2OperationState failedState = restartState.get(); + LOGGER.debug("Restart handler returned with a failed state {}", failedState); + c2OperationAck.setOperationState(failedState); + sendAcknowledge(c2OperationAck); + } finally { + operationQueueDAO.cleanup(); + LOGGER.debug("Heartbeats are enabled again"); + heartbeatLock.unlock(); + } + } + } + + private void processRestartState() { + Optional<OperationQueue> operationQueue = operationQueueDAO.load(); + + operationQueue.map(OperationQueue::getRemainingOperations) + .filter(not(List::isEmpty)) + .ifPresent(this::processRemainingOperations); + + operationQueue.map(OperationQueue::getCurrentOperation) + .ifPresentOrElse(this::processCurrentOperation, + () -> LOGGER.debug("No operation to acknowledge to C2 server")); + + operationQueue.ifPresent(__ -> operationQueueDAO.cleanup()); + } + + private void processRemainingOperations(List<C2Operation> remainingOperations) { + LOGGER.debug("Found remaining operations operations after restart. Heartbeats are stopped until processing is completed"); + heartbeatLock.lock(); + try { + List<C2Operation> mergedOperations = new LinkedList<>(); + mergedOperations.addAll(remainingOperations); + mergedOperations.addAll(c2Operations); + c2Operations.clear(); + mergedOperations.forEach(c2Operations::add); + } catch (Exception e) { + LOGGER.warn("Unable to recover operations from operation queue", e); + } finally { + heartbeatLock.unlock(); + LOGGER.debug("Heartbeat lock released"); + } + } + + private void processCurrentOperation(C2Operation operation) { + LOGGER.debug("Found operation {} to acknowledge to C2 server", operation); + + C2OperationState c2OperationState = c2OperationRestartHandler.waitForResponse() + .map(this::c2OperationState) + .orElse(c2OperationState(NOT_APPLIED)); + + C2OperationAck c2OperationAck = new C2OperationAck(); + c2OperationAck.setOperationId(operation.getIdentifier()); + c2OperationAck.setOperationState(c2OperationState); + + sendAcknowledge(c2OperationAck); + } + + private Optional<C2OperationState> initRestart(C2Operation operation) { + try { + LOGGER.debug("Restart initiated"); + OperationQueue operationQueue = OperationQueue.create(operation, c2Operations); + operationQueueDAO.save(operationQueue); + return c2OperationRestartHandler.handleRestart(operation).map(this::c2OperationState); + } catch (Exception e) { + LOGGER.error("Failed to initiate restart. Dropping operation and continue with remaining operations", e); + return of(c2OperationState(NOT_APPLIED)); + } + } + + private C2OperationState c2OperationState(OperationState operationState) { + C2OperationState c2OperationState = new C2OperationState(); + c2OperationState.setState(operationState); + return c2OperationState; + } + + private void sendAcknowledge(C2OperationAck operationAck) { + try { + client.acknowledgeOperation(operationAck); + } catch (Exception e) { + LOGGER.error("Failed to send acknowledge", e); + } + } + + private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) { + return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck); + } + + private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) { + return ofNullable(c2OperationAck) + .map(C2OperationAck::getOperationState) + .map(C2OperationState::getState) + .filter(FULLY_APPLIED::equals) + .isPresent(); + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java similarity index 61% copy from c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java copy to c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java index 1216aa812d..c0b733946e 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java @@ -18,28 +18,12 @@ package org.apache.nifi.c2.client.service.operation; import java.util.Optional; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationState; -/** - * The purpose of this interface is to be able to persist operations between restarts. - */ -public interface RequestedOperationDAO { - - /** - * Persist the given requested operation list - * @param operationQueue the queue containing the current and remaining operations - */ - void save(OperationQueue operationQueue); - - /** - * Returns the saved Operations - * - * @return the C2 Operations queue with the actual operation - */ - Optional<OperationQueue> load(); +public interface C2OperationRestartHandler { - /** - * Resets the saved operations - */ - void cleanup(); + Optional<C2OperationState.OperationState> handleRestart(C2Operation c2Operation); + Optional<C2OperationState.OperationState> waitForResponse(); } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java index ae0c56c1a8..1f3b3ecb02 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java @@ -17,10 +17,12 @@ package org.apache.nifi.c2.client.service.operation; +import static java.util.Optional.ofNullable; + import java.io.Serializable; -import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Queue; import org.apache.nifi.c2.protocol.api.C2Operation; public class OperationQueue implements Serializable { @@ -29,12 +31,23 @@ public class OperationQueue implements Serializable { private C2Operation currentOperation; private List<C2Operation> remainingOperations; + public static OperationQueue create(C2Operation currentOperation, Queue<C2Operation> remainingOperations) { + return new OperationQueue( + currentOperation, + ofNullable(remainingOperations) + .map(queue -> queue.stream().toList()) + .orElseGet(List::of) + ); + + } + public OperationQueue() { } public OperationQueue(C2Operation currentOperation, List<C2Operation> remainingOperations) { this.currentOperation = currentOperation; - this.remainingOperations = remainingOperations == null ? Collections.emptyList() : remainingOperations; + this.remainingOperations = remainingOperations; + } public C2Operation getCurrentOperation() { diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java similarity index 97% copy from c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java copy to c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java index 1216aa812d..ec3ee18230 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java @@ -22,7 +22,7 @@ import java.util.Optional; /** * The purpose of this interface is to be able to persist operations between restarts. */ -public interface RequestedOperationDAO { +public interface OperationQueueDAO { /** * Persist the given requested operation list diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java deleted file mode 100644 index 6de1f86015..0000000000 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.c2.client.service; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.nifi.c2.client.api.C2Client; -import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; -import org.apache.nifi.c2.client.service.operation.C2OperationHandler; -import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider; -import org.apache.nifi.c2.client.service.operation.OperationQueue; -import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO; -import org.apache.nifi.c2.protocol.api.C2Heartbeat; -import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; -import org.apache.nifi.c2.protocol.api.C2Operation; -import org.apache.nifi.c2.protocol.api.C2OperationAck; -import org.apache.nifi.c2.protocol.api.C2OperationState; -import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -public class C2ClientServiceTest { - - @Mock - private C2Client client; - - @Mock - private C2HeartbeatFactory c2HeartbeatFactory; - - @Mock - private C2OperationHandlerProvider operationService; - - @Mock - private RuntimeInfoWrapper runtimeInfoWrapper; - - @Mock - private RequestedOperationDAO requestedOperationDAO; - - @Mock - private Consumer<C2Operation> c2OperationRegister; - - @InjectMocks - private C2ClientService c2ClientService; - - @Test - void testSendHeartbeatAndAckWhenOperationPresent() { - C2Heartbeat heartbeat = mock(C2Heartbeat.class); - when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat); - C2HeartbeatResponse hbResponse = new C2HeartbeatResponse(); - final List<C2Operation> c2Operations = generateOperation(1); - hbResponse.setRequestedOperations(c2Operations); - when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse)); - C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class); - when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler)); - when(c2OperationHandler.handle(c2Operations.get(0))).thenReturn(new C2OperationAck()); - - c2ClientService.sendHeartbeat(runtimeInfoWrapper); - - verify(c2HeartbeatFactory).create(any()); - verify(client).publishHeartbeat(heartbeat); - verify(c2OperationHandler).handle(any()); - verify(client).acknowledgeOperation(any()); - } - - @Test - void testSendHeartbeatAndAckForMultipleOperationPresent() { - int operationNum = 5; - C2Heartbeat heartbeat = mock(C2Heartbeat.class); - when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat); - C2HeartbeatResponse hbResponse = new C2HeartbeatResponse(); - hbResponse.setRequestedOperations(generateOperation(operationNum)); - C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class); - when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse)); - when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler)); - when(c2OperationHandler.handle(any())).thenReturn(new C2OperationAck()); - - c2ClientService.sendHeartbeat(runtimeInfoWrapper); - - verify(c2HeartbeatFactory).create(any()); - verify(client).publishHeartbeat(heartbeat); - verify(c2OperationHandler, times(operationNum)).handle(any()); - verify(client, times(operationNum)).acknowledgeOperation(any()); - } - - @Test - void testSendHeartbeatHandlesNoHeartbeatResponse() { - C2Heartbeat heartbeat = mock(C2Heartbeat.class); - when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat); - when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.empty()); - - c2ClientService.sendHeartbeat(runtimeInfoWrapper); - - verify(c2HeartbeatFactory).create(any()); - verify(client).publishHeartbeat(heartbeat); - verify(client, times(0)).acknowledgeOperation(any()); - } - - @Test - void testSendHeartbeatNotHandledWhenThereAreNoOperationsSent() { - C2Heartbeat heartbeat = mock(C2Heartbeat.class); - when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat); - C2HeartbeatResponse hbResponse = new C2HeartbeatResponse(); - when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse)); - - c2ClientService.sendHeartbeat(runtimeInfoWrapper); - - verify(c2HeartbeatFactory).create(any()); - verify(client).publishHeartbeat(heartbeat); - verify(client, times(0)).acknowledgeOperation(any()); - } - - @Test - void testSendHeartbeatNotAckWhenOperationAckMissing() { - C2Heartbeat heartbeat = mock(C2Heartbeat.class); - when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat); - C2HeartbeatResponse hbResponse = new C2HeartbeatResponse(); - hbResponse.setRequestedOperations(generateOperation(1)); - when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse)); - when(operationService.getHandlerForOperation(any())).thenReturn(Optional.empty()); - - c2ClientService.sendHeartbeat(runtimeInfoWrapper); - - verify(c2HeartbeatFactory).create(any()); - verify(client).publishHeartbeat(heartbeat); - verify(client, times(0)).acknowledgeOperation(any()); - } - - @Test - void shouldHeartbeatSendingNotPropagateExceptions() { - when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenThrow(new RuntimeException()); - - c2ClientService.sendHeartbeat(runtimeInfoWrapper); - } - - @Test - void shouldAckSendingNotPropagateExceptions() { - C2OperationAck operationAck = mock(C2OperationAck.class); - doThrow(new RuntimeException()).when(client).acknowledgeOperation(operationAck); - - c2ClientService.sendAcknowledge(operationAck); - } - - @Test - void shouldSendAcknowledgeWithoutPersistingOperationsWhenOperationRequiresRestartButHandlerReturnsNonFullyAppliedState() { - C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class); - C2OperationAck operationAck = new C2OperationAck(); - C2OperationState c2OperationState = new C2OperationState(); - c2OperationState.setState(OperationState.NOT_APPLIED); - operationAck.setOperationState(c2OperationState); - when(c2OperationHandler.requiresRestart()).thenReturn(true); - when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler)); - when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(operationAck); - - c2ClientService.handleRequestedOperations(generateOperation(1)); - - verify(operationService).getHandlerForOperation(any(C2Operation.class)); - verify(c2OperationHandler).handle(any(C2Operation.class)); - verify(requestedOperationDAO).cleanup(); - verify(client).acknowledgeOperation(operationAck); - verifyNoMoreInteractions(operationService, client, requestedOperationDAO); - verifyNoInteractions(c2HeartbeatFactory, c2OperationRegister); - } - - @Test - void shouldSaveOperationQueueIfRestartIsNeededAndThereAreMultipleRequestedOperations() { - C2Operation c2Operation1 = new C2Operation(); - c2Operation1.setIdentifier("1"); - C2Operation c2Operation2 = new C2Operation(); - c2Operation2.setIdentifier("2"); - C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class); - when(c2OperationHandler.requiresRestart()).thenReturn(true); - when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler)); - C2OperationAck c2OperationAck = new C2OperationAck(); - C2OperationState c2OperationState = new C2OperationState(); - c2OperationState.setState(OperationState.FULLY_APPLIED); - c2OperationAck.setOperationState(c2OperationState); - when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck); - - c2ClientService.handleRequestedOperations(Arrays.asList(c2Operation1, c2Operation2)); - - verify(requestedOperationDAO).save(new OperationQueue(c2Operation1, Collections.singletonList(c2Operation2))); - verify(c2OperationRegister).accept(c2Operation1); - verifyNoInteractions(client, c2HeartbeatFactory); - verifyNoMoreInteractions(requestedOperationDAO, c2OperationRegister, operationService); - } - - @Test - void shouldReEnableHeartbeatsIfExceptionHappensDuringRegisteringOperationAndThereIsNoMoreOperationInQueue() { - C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class); - C2Operation operation = new C2Operation(); - when(c2OperationHandler.requiresRestart()).thenReturn(true); - when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler)); - C2OperationAck c2OperationAck = new C2OperationAck(); - C2OperationState c2OperationState = new C2OperationState(); - c2OperationState.setState(OperationState.FULLY_APPLIED); - c2OperationAck.setOperationState(c2OperationState); - when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck); - doThrow(new RuntimeException()).when(c2OperationRegister).accept(any(C2Operation.class)); - c2ClientService.handleRequestedOperations(Collections.singletonList(operation)); - when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenReturn(new C2Heartbeat()); - - c2ClientService.sendHeartbeat(runtimeInfoWrapper); - - verify(c2HeartbeatFactory).create(runtimeInfoWrapper); - verify(client).publishHeartbeat(any(C2Heartbeat.class)); - } - - @Test - void shouldContinueWithRemainingOperationsIfExceptionHappensDuringRegisteringOperationAndThereAreMoreOperationsInQueue() { - C2OperationHandler c2OperationHandlerForRestart = mock(C2OperationHandler.class); - C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class); - C2Operation operation1 = new C2Operation(); - operation1.setIdentifier("1"); - C2Operation operation2 = new C2Operation(); - operation2.setIdentifier("2"); - C2OperationAck c2OperationAck = new C2OperationAck(); - C2OperationState c2OperationState = new C2OperationState(); - c2OperationState.setState(OperationState.FULLY_APPLIED); - c2OperationAck.setOperationState(c2OperationState); - when(c2OperationHandler.requiresRestart()).thenReturn(false); - when(c2OperationHandlerForRestart.requiresRestart()).thenReturn(true); - when(operationService.getHandlerForOperation(operation1)).thenReturn(Optional.of(c2OperationHandlerForRestart)); - when(operationService.getHandlerForOperation(operation2)).thenReturn(Optional.of(c2OperationHandler)); - when(c2OperationHandlerForRestart.handle(operation1)).thenReturn(c2OperationAck); - when(c2OperationHandler.handle(operation2)).thenReturn(c2OperationAck); - - doThrow(new RuntimeException()).when(c2OperationRegister).accept(operation1); - - c2ClientService.handleRequestedOperations(Arrays.asList(operation1, operation2)); - - verify(client, times(2)).acknowledgeOperation(c2OperationAck); - } - - private List<C2Operation> generateOperation(int num) { - return IntStream.range(0, num) - .mapToObj(x -> new C2Operation()) - .collect(Collectors.toList()); - } -} diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java new file mode 100644 index 0000000000..2de1483487 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service; + +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; +import org.apache.nifi.c2.protocol.api.C2Heartbeat; +import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class C2HeartbeatManagerTest { + + @Mock + private C2Client mockC2Client; + + @Mock + private C2HeartbeatFactory mockC2HeartbeatFactory; + + @Mock + private ReentrantLock mockHeartbeatLock; + + @Mock + private RuntimeInfoWrapper mockRuntimeInfoWrapper; + + @Mock + private C2OperationManager mockC2OperationManager; + + @InjectMocks + private C2HeartbeatManager testHeartbeatManager; + + @Test + void shouldSkipSendingHeartbeatIfHeartbeatLockIsAcquired() { + when(mockHeartbeatLock.tryLock()).thenReturn(false); + + testHeartbeatManager.run(); + + verify(mockC2HeartbeatFactory, never()).create(any()); + verify(mockC2Client, never()).publishHeartbeat(any()); + verify(mockC2OperationManager, never()).add(any()); + verify(mockHeartbeatLock, never()).unlock(); + } + + @Test + void shouldSendHeartbeatAndProcessEmptyResponse() { + when(mockHeartbeatLock.tryLock()).thenReturn(true); + C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class); + when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat); + when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(empty()); + + testHeartbeatManager.run(); + + verify(mockC2HeartbeatFactory, times(1)).create(mockRuntimeInfoWrapper); + verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat); + verify(mockC2OperationManager, never()).add(any()); + verify(mockHeartbeatLock, times(1)).unlock(); + + } + + @Test + void shouldSendHeartbeatAndProcessResponseWithNoOperation() { + when(mockHeartbeatLock.tryLock()).thenReturn(true); + C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class); + when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat); + C2HeartbeatResponse mockC2HeartbeatResponse = mock(C2HeartbeatResponse.class); + when(mockC2HeartbeatResponse.getRequestedOperations()).thenReturn(List.of()); + when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(Optional.of(mockC2HeartbeatResponse)); + + testHeartbeatManager.run(); + + verify(mockC2HeartbeatFactory, times(1)).create(mockRuntimeInfoWrapper); + verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat); + verify(mockC2OperationManager, never()).add(any()); + verify(mockHeartbeatLock, times(1)).unlock(); + } + + @Test + void shouldSendHeartbeatAndProcessResponseWithMultipleOperation() { + when(mockHeartbeatLock.tryLock()).thenReturn(true); + C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class); + when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat); + C2HeartbeatResponse mockC2HeartbeatResponse = mock(C2HeartbeatResponse.class); + C2Operation mockOperation1 = mock(C2Operation.class); + C2Operation mockOperation2 = mock(C2Operation.class); + when(mockC2HeartbeatResponse.getRequestedOperations()).thenReturn(List.of(mockOperation1, mockOperation2)); + when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(ofNullable(mockC2HeartbeatResponse)); + + testHeartbeatManager.run(); + + verify(mockC2HeartbeatFactory, times(1)).create(mockRuntimeInfoWrapper); + verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat); + verify(mockC2OperationManager, times(1)).add(mockOperation1); + verify(mockC2OperationManager, times(1)).add(mockOperation2); + verify(mockHeartbeatLock, times(1)).unlock(); + } + + @Test + void shouldReleaseHeartbeatLockWhenExceptionOccurs() { + when(mockHeartbeatLock.tryLock()).thenReturn(true); + when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenThrow(new RuntimeException()); + + testHeartbeatManager.run(); + + verify(mockC2HeartbeatFactory, times(1)).create(mockRuntimeInfoWrapper); + verify(mockC2Client, never()).publishHeartbeat(any()); + verify(mockC2OperationManager, never()).add(any()); + verify(mockHeartbeatLock, times(1)).unlock(); + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java new file mode 100644 index 0000000000..d531d73ddf --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service; + +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; +import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.client.service.operation.C2OperationHandler; +import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider; +import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler; +import org.apache.nifi.c2.client.service.operation.OperationQueue; +import org.apache.nifi.c2.client.service.operation.OperationQueueDAO; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class C2OperationManagerTest { + + private static final long MAX_WAIT_TIME_MS = 1000; + + @Mock + private C2Client mockC2Client; + + @Mock + private C2OperationHandlerProvider mockC2OperationHandlerProvider; + + @Mock + private ReentrantLock mockHeartbeatLock; + + @Mock + private OperationQueueDAO mockOperationQueueDAO; + + @Mock + private C2OperationRestartHandler mockC2OperationRestartHandler; + + @InjectMocks + private C2OperationManager testC2OperationManager; + + @Captor + ArgumentCaptor<C2OperationAck> c2OperationAckArgumentCaptor; + + private ExecutorService executorService; + + @BeforeEach + void setup() { + executorService = newVirtualThreadPerTaskExecutor(); + } + + @AfterEach + void teardown() { + executorService.shutdownNow(); + } + + @Test + void shouldWaitForIncomingOperationThenTimeout() { + Future<?> future = executorService.submit(testC2OperationManager); + + assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS)); + verify(mockC2OperationHandlerProvider, never()).getHandlerForOperation(any()); + } + + @Test + void shouldContinueWithoutProcessingWhenNoHandlerIsDefined() { + C2Operation testOperation = mock(C2Operation.class); + when(mockC2OperationHandlerProvider.getHandlerForOperation(testOperation)).thenReturn(empty()); + + Future<?> future = executorService.submit(testC2OperationManager); + testC2OperationManager.add(testOperation); + + assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS)); + verify(mockC2Client, never()).acknowledgeOperation(any()); + verify(mockHeartbeatLock, never()).lock(); + } + + @Test + void shouldProcessOperationWithoutRestartAndAcknowledge() { + C2Operation mockOperation = mock(C2Operation.class); + C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class); + C2OperationAck mockC2OperationAck = mock(C2OperationAck.class); + when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler)); + when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck); + when(mockOperationHandler.requiresRestart()).thenReturn(false); + + Future<?> future = executorService.submit(testC2OperationManager); + testC2OperationManager.add(mockOperation); + + assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS)); + verify(mockC2Client, times(1)).acknowledgeOperation(mockC2OperationAck); + verify(mockHeartbeatLock, never()).lock(); + } + + @Test + void shouldProcessOperationWithSuccessfulRestart() { + C2Operation mockOperation = mock(C2Operation.class); + C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class); + C2OperationAck mockC2OperationAck = mock(C2OperationAck.class); + C2OperationState mockC2OperationState = mock(C2OperationState.class); + when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler)); + when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck); + when(mockOperationHandler.requiresRestart()).thenReturn(true); + when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState); + when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED); + when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenReturn(empty()); + + Future<?> future = executorService.submit(testC2OperationManager); + testC2OperationManager.add(mockOperation); + + assertDoesNotThrow(() -> future.get()); + verify(mockC2Client, never()).acknowledgeOperation(mockC2OperationAck); + verify(mockHeartbeatLock, times(1)).lock(); + verify(mockHeartbeatLock, never()).unlock(); + verify(mockOperationQueueDAO, times(1)).save(any()); + verify(mockOperationQueueDAO, never()).cleanup(); + } + + @Test + void shouldProcessOperationWithFailedRestartDueToFailedResponse() { + C2Operation mockOperation = mock(C2Operation.class); + C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class); + C2OperationAck mockC2OperationAck = mock(C2OperationAck.class); + C2OperationState mockC2OperationState = mock(C2OperationState.class); + when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler)); + when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck); + when(mockOperationHandler.requiresRestart()).thenReturn(true); + when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState); + when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED); + when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenReturn(ofNullable(NOT_APPLIED)); + + Future<?> future = executorService.submit(testC2OperationManager); + testC2OperationManager.add(mockOperation); + + assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS)); + verify(mockHeartbeatLock, times(1)).lock(); + verify(mockHeartbeatLock, times(1)).unlock(); + verify(mockC2Client, times(1)).acknowledgeOperation(mockC2OperationAck); + verify(mockOperationQueueDAO, times(1)).save(any()); + verify(mockOperationQueueDAO, times(1)).cleanup(); + } + + @Test + void shouldProcessOperationWithFailedRestartDueToException() { + C2Operation mockOperation = mock(C2Operation.class); + C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class); + C2OperationAck mockC2OperationAck = mock(C2OperationAck.class); + C2OperationState mockC2OperationState = mock(C2OperationState.class); + when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler)); + when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck); + when(mockOperationHandler.requiresRestart()).thenReturn(true); + when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState); + when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED); + when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenThrow(new RuntimeException()); + + Future<?> future = executorService.submit(testC2OperationManager); + testC2OperationManager.add(mockOperation); + + assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS)); + verify(mockHeartbeatLock, times(1)).lock(); + verify(mockHeartbeatLock, times(1)).unlock(); + verify(mockC2Client, times(1)).acknowledgeOperation(mockC2OperationAck); + verify(mockOperationQueueDAO, times(1)).save(any()); + verify(mockOperationQueueDAO, times(1)).cleanup(); + } + + @Test + void shouldProcessStateWithOneCurrentAndNoRemainingOperations() { + OperationQueue mockOperationQueue = mock(OperationQueue.class); + C2Operation mockCurrentOperation = mock(C2Operation.class); + when(mockOperationQueue.getCurrentOperation()).thenReturn(mockCurrentOperation); + when(mockOperationQueue.getRemainingOperations()).thenReturn(List.of()); + when(mockOperationQueueDAO.load()).thenReturn(ofNullable(mockOperationQueue)); + when(mockC2OperationRestartHandler.waitForResponse()).thenReturn(ofNullable(FULLY_APPLIED)); + + Future<?> future = executorService.submit(testC2OperationManager); + + assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS)); + verify(mockHeartbeatLock, never()).lock(); + verify(mockHeartbeatLock, never()).unlock(); + verify(mockC2Client, times(1)).acknowledgeOperation(c2OperationAckArgumentCaptor.capture()); + assertEquals(FULLY_APPLIED, c2OperationAckArgumentCaptor.getValue().getOperationState().getState()); + } + + @Test + void shouldProcessStateWithOneCurrentAndOneRemainingOperation() { + OperationQueue mockOperationQueue = mock(OperationQueue.class); + C2Operation mockCurrentOperation = mock(C2Operation.class); + C2Operation mockRemainingOperation = mock(C2Operation.class); + when(mockOperationQueue.getCurrentOperation()).thenReturn(mockCurrentOperation); + when(mockOperationQueue.getRemainingOperations()).thenReturn(List.of(mockRemainingOperation)); + when(mockOperationQueueDAO.load()).thenReturn(ofNullable(mockOperationQueue)); + when(mockC2OperationRestartHandler.waitForResponse()).thenReturn(ofNullable(FULLY_APPLIED)); + C2OperationHandler mockOperationHandler = mock(C2OperationHandler.class); + C2OperationAck mockC2OperationAck = mock(C2OperationAck.class); + when(mockC2OperationHandlerProvider.getHandlerForOperation(mockRemainingOperation)).thenReturn(ofNullable(mockOperationHandler)); + when(mockOperationHandler.handle(mockRemainingOperation)).thenReturn(mockC2OperationAck); + when(mockOperationHandler.requiresRestart()).thenReturn(false); + + Future<?> future = executorService.submit(testC2OperationManager); + + assertThrows(TimeoutException.class, () -> future.get(MAX_WAIT_TIME_MS, MILLISECONDS)); + verify(mockHeartbeatLock, times(1)).lock(); + verify(mockHeartbeatLock, times(1)).unlock(); + verify(mockC2Client, times(2)).acknowledgeOperation(any()); + } +} diff --git a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java index f685519f04..07e2c16a50 100644 --- a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java +++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java @@ -91,6 +91,7 @@ public enum MiNiFiProperties { C2_SECURITY_KEYSTORE_PASSWORD("c2.security.keystore.password", "", true, false, VALID), C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, false, VALID), C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID), + C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 sec", false, true, VALID), NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID), NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID), NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR), diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java new file mode 100644 index 0000000000..76632381dc --- /dev/null +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.c2; + +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.nifi.bootstrap.CommandResult.FAILURE; +import static org.apache.nifi.bootstrap.CommandResult.SUCCESS; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.nifi.bootstrap.BootstrapCommunicator; +import org.apache.nifi.bootstrap.CommandResult; +import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.apache.nifi.minifi.commons.api.MiNiFiCommandState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BootstrapC2OperationRestartHandler implements C2OperationRestartHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapC2OperationRestartHandler.class); + + private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION"; + private static final String TIMEOUT = "timeout"; + private static final Map<MiNiFiCommandState, OperationState> OPERATION_STATE_MAP = Map.of( + MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED, + MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION, + MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, NOT_APPLIED, + MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, NOT_APPLIED); + + private final BootstrapCommunicator bootstrapCommunicator; + private final BlockingQueue<OperationState> operationStateHolder; + private final long bootstrapAcknowledgeTimeoutMs; + + public BootstrapC2OperationRestartHandler(BootstrapCommunicator bootstrapCommunicator, long bootstrapAcknowledgeTimeoutMs) { + this.bootstrapCommunicator = bootstrapCommunicator; + this.operationStateHolder = new ArrayBlockingQueue<>(1); + this.bootstrapAcknowledgeTimeoutMs = bootstrapAcknowledgeTimeoutMs; + bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION, this::bootstrapCallback); + } + + @Override + public Optional<OperationState> handleRestart(C2Operation c2Operation) { + CommandResult sendCommandResult = sendBootstrapCommand(c2Operation); + if (sendCommandResult == SUCCESS) { + LOGGER.debug("Bootstrap successfully received command. Waiting for response"); + return waitForResponse(); + } else { + LOGGER.debug("Bootstrap failed to receive command"); + return Optional.of(NOT_APPLIED); + } + } + + @Override + public Optional<OperationState> waitForResponse() { + try { + OperationState operationState = operationStateHolder.poll(bootstrapAcknowledgeTimeoutMs, MILLISECONDS); + LOGGER.debug("Bootstrap returned response: {}", ofNullable(operationState).map(Objects::toString).orElse(TIMEOUT)); + return Optional.of(ofNullable(operationState).orElse(NOT_APPLIED)); + } catch (InterruptedException e) { + LOGGER.debug("Bootstrap response waiting interrupted, possible due to Bootstrap is restarting MiNiFi process"); + return empty(); + } + } + + private CommandResult sendBootstrapCommand(C2Operation c2Operation) { + String command = createBootstrapCommand(c2Operation); + try { + return bootstrapCommunicator.sendCommand(command); + } catch (IOException e) { + LOGGER.error("Failed to send operation to bootstrap", e); + return FAILURE; + } + } + + private String createBootstrapCommand(C2Operation c2Operation) { + return ofNullable(c2Operation.getOperand()) + .map(operand -> c2Operation.getOperation().name() + "_" + operand.name()) + .orElse(c2Operation.getOperation().name()); + } + + private void bootstrapCallback(String[] params, OutputStream outputStream) { + LOGGER.info("Received acknowledge message from bootstrap process"); + if (params.length < 1) { + LOGGER.error("Invalid arguments coming from bootstrap"); + return; + } + MiNiFiCommandState miNiFiCommandState = MiNiFiCommandState.valueOf(params[0]); + OperationState operationState = OPERATION_STATE_MAP.get(miNiFiCommandState); + try { + operationStateHolder.put(operationState); + } catch (InterruptedException e) { + LOGGER.warn("Bootstrap hook thread was interrupted"); + } + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java index 0915d87ce6..e0ddec368a 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java @@ -17,12 +17,19 @@ package org.apache.nifi.minifi.c2; +import static java.lang.Boolean.parseBoolean; +import static java.lang.Integer.parseInt; +import static java.lang.Long.parseLong; import static java.util.Optional.ofNullable; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.toMap; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_CLASS; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_HEARTBEAT_PERIOD; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_IDENTIFIER; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_ASSET_DIRECTORY; +import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_CONFIG_DIRECTORY; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FULL_HEARTBEAT; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_KEEP_ALIVE_DURATION; @@ -45,24 +52,24 @@ import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_KE import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_LOCATION; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_PASSWORD; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_TYPE; +import static org.apache.nifi.util.FormatUtils.getPreciseTimeDuration; import static org.apache.nifi.util.NiFiProperties.FLOW_CONFIGURATION_FILE; import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_ALGORITHM; import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.bootstrap.BootstrapCommunicator; import org.apache.nifi.c2.client.C2ClientConfig; import org.apache.nifi.c2.client.http.C2HttpClient; -import org.apache.nifi.c2.client.service.C2ClientService; import org.apache.nifi.c2.client.service.C2HeartbeatFactory; +import org.apache.nifi.c2.client.service.C2HeartbeatManager; +import org.apache.nifi.c2.client.service.C2OperationManager; import org.apache.nifi.c2.client.service.FlowIdHolder; import org.apache.nifi.c2.client.service.ManifestHashProvider; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; @@ -70,8 +77,7 @@ import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider; import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler; import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider; import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider; -import org.apache.nifi.c2.client.service.operation.OperationQueue; -import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO; +import org.apache.nifi.c2.client.service.operation.OperationQueueDAO; import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider; import org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler; import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler; @@ -81,10 +87,6 @@ import org.apache.nifi.c2.client.service.operation.UpdatePropertiesOperationHand import org.apache.nifi.c2.protocol.api.AgentManifest; import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus; -import org.apache.nifi.c2.protocol.api.C2Operation; -import org.apache.nifi.c2.protocol.api.C2OperationAck; -import org.apache.nifi.c2.protocol.api.C2OperationState; -import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.serializer.C2JacksonSerializer; import org.apache.nifi.controller.FlowController; @@ -99,14 +101,13 @@ import org.apache.nifi.minifi.c2.command.PropertiesPersister; import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper; import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper; import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider; -import org.apache.nifi.minifi.commons.api.MiNiFiCommandState; +import org.apache.nifi.minifi.commons.api.MiNiFiProperties; import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor; import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService; import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor; import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService; import org.apache.nifi.nar.ExtensionManagerHolder; import org.apache.nifi.services.FlowService; -import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,35 +116,25 @@ public class C2NifiClientService { private static final Logger LOGGER = LoggerFactory.getLogger(C2NifiClientService.class); private static final String ROOT_GROUP_ID = "root"; - private static final Long INITIAL_DELAY = 10000L; private static final Integer TERMINATION_WAIT = 5000; - private static final int MINIFI_RESTART_TIMEOUT_SECONDS = 60; - private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION"; - private static final int IS_ACK_RECEIVED_POLL_INTERVAL = 1000; - private static final int MAX_WAIT_FOR_BOOTSTRAP_ACK_MS = 20000; + private static final Long INITIAL_HEARTBEAT_DELAY_MS = 10000L; - private static final Map<MiNiFiCommandState, OperationState> OPERATION_STATE_MAP = Map.of( - MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED, - MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION, - MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, OperationState.NOT_APPLIED, - MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, OperationState.NOT_APPLIED); + private final ScheduledExecutorService heartbeatManagerExecutorService; + private final ExecutorService operationManagerExecutorService; - private final C2ClientService c2ClientService; private final FlowController flowController; - private final ScheduledThreadPoolExecutor heartbeatExecutorService; - private final ScheduledThreadPoolExecutor bootstrapAcknowledgeExecutorService; private final ExtensionManifestParser extensionManifestParser; private final RuntimeManifestService runtimeManifestService; private final SupportedOperationsProvider supportedOperationsProvider; - private final RequestedOperationDAO requestedOperationDAO; - private final BootstrapCommunicator bootstrapCommunicator; - private final long heartbeatPeriod; + private final C2HeartbeatManager c2HeartbeatManager; + private final C2OperationManager c2OperationManager; - private volatile boolean ackReceived = false; + private final long heartbeatPeriod; public C2NifiClientService(NiFiProperties niFiProperties, FlowController flowController, BootstrapCommunicator bootstrapCommunicator, FlowService flowService) { - this.heartbeatExecutorService = new ScheduledThreadPoolExecutor(1); - this.bootstrapAcknowledgeExecutorService = new ScheduledThreadPoolExecutor(1); + this.heartbeatManagerExecutorService = newScheduledThreadPool(1); + this.operationManagerExecutorService = newSingleThreadExecutor(); + this.extensionManifestParser = new JAXBExtensionManifestParser(); C2ClientConfig clientConfig = generateClientConfig(niFiProperties); @@ -160,36 +151,34 @@ public class C2NifiClientService { C2HttpClient client = C2HttpClient.create(clientConfig, new C2JacksonSerializer()); FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory()); C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider()); - - this.requestedOperationDAO = new FileBasedRequestedOperationDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir", "bin"), new ObjectMapper()); String bootstrapConfigFileLocation = niFiProperties.getProperty("nifi.minifi.bootstrap.file"); - C2OperationHandlerProvider c2OperationHandlerProvider = c2OperationHandlerProvider(niFiProperties, flowController, flowService, flowIdHolder, client, heartbeatFactory, bootstrapConfigFileLocation, clientConfig.getC2AssetDirectory()); - this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationHandlerProvider, requestedOperationDAO, this::registerOperation); this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers()); - this.bootstrapCommunicator = bootstrapCommunicator; - this.bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION, (params, output) -> acknowledgeHandler(params)); + OperationQueueDAO operationQueueDAO = + new FileBasedOperationQueueDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir", "bin"), new ObjectMapper()); + ReentrantLock heartbeatLock = new ReentrantLock(); + BootstrapC2OperationRestartHandler c2OperationRestartHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, clientConfig.getBootstrapAcknowledgeTimeout()); + + this.c2OperationManager = new C2OperationManager( + client, c2OperationHandlerProvider, heartbeatLock, operationQueueDAO, c2OperationRestartHandler); + this.c2HeartbeatManager = new C2HeartbeatManager( + client, heartbeatFactory, heartbeatLock, generateRuntimeInfo(), c2OperationManager); } private C2ClientConfig generateClientConfig(NiFiProperties properties) { return new C2ClientConfig.Builder() .agentClass(properties.getProperty(C2_AGENT_CLASS.getKey(), C2_AGENT_CLASS.getDefaultValue())) .agentIdentifier(properties.getProperty(C2_AGENT_IDENTIFIER.getKey())) - .fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(), C2_FULL_HEARTBEAT.getDefaultValue()))) - .heartbeatPeriod(Long.parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(), - C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue()))) - .connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CONNECTION_TIMEOUT.getKey(), - C2_REST_CONNECTION_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS)) - .readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_READ_TIMEOUT.getKey(), - C2_REST_READ_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS)) - .callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CALL_TIMEOUT.getKey(), - C2_REST_CALL_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS)) - .maxIdleConnections(Integer.parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(), C2_MAX_IDLE_CONNECTIONS.getDefaultValue()))) - .keepAliveDuration((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_KEEP_ALIVE_DURATION.getKey(), - C2_KEEP_ALIVE_DURATION.getDefaultValue()), TimeUnit.MILLISECONDS)) + .fullHeartbeat(parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(), C2_FULL_HEARTBEAT.getDefaultValue()))) + .heartbeatPeriod(parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(), C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue()))) + .connectTimeout(durationPropertyInMilliSecs(properties, C2_REST_CONNECTION_TIMEOUT)) + .readTimeout(durationPropertyInMilliSecs(properties, C2_REST_READ_TIMEOUT)) + .callTimeout(durationPropertyInMilliSecs(properties, C2_REST_CALL_TIMEOUT)) + .maxIdleConnections(parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(), C2_MAX_IDLE_CONNECTIONS.getDefaultValue()))) + .keepAliveDuration(durationPropertyInMilliSecs(properties, C2_KEEP_ALIVE_DURATION)) .httpHeaders(properties.getProperty(C2_REST_HTTP_HEADERS.getKey(), C2_REST_HTTP_HEADERS.getDefaultValue())) .c2RequestCompression(properties.getProperty(C2_REQUEST_COMPRESSION.getKey(), C2_REQUEST_COMPRESSION.getDefaultValue())) .c2AssetDirectory(properties.getProperty(C2_ASSET_DIRECTORY.getKey(), C2_ASSET_DIRECTORY.getDefaultValue())) @@ -207,9 +196,14 @@ public class C2NifiClientService { .c2RestPathBase(properties.getProperty(C2_REST_PATH_BASE.getKey(), C2_REST_PATH_BASE.getDefaultValue())) .c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(), C2_REST_PATH_HEARTBEAT.getDefaultValue())) .c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(), C2_REST_PATH_ACKNOWLEDGE.getDefaultValue())) + .bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties, C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT)) .build(); } + private long durationPropertyInMilliSecs(NiFiProperties properties, MiNiFiProperties property) { + return (long) getPreciseTimeDuration(properties.getProperty(property.getKey(), property.getDefaultValue()), MILLISECONDS); + } + private C2OperationHandlerProvider c2OperationHandlerProvider(NiFiProperties niFiProperties, FlowController flowController, FlowService flowService, FlowIdHolder flowIdHolder, C2HttpClient client, C2HeartbeatFactory heartbeatFactory, String bootstrapConfigFileLocation, String c2AssetDirectory) { @@ -240,107 +234,29 @@ public class C2NifiClientService { } public void start() { - handleOngoingOperations(); - heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS); - } - - // need to be synchronized to prevent parallel run coming from acknowledgeHandler/ackTimeoutTask - private synchronized void handleOngoingOperations() { - Optional<OperationQueue> operationQueue = requestedOperationDAO.load(); - LOGGER.info("Handling ongoing operations: {}", operationQueue); - if (operationQueue.isPresent()) { - try { - waitForAcknowledgeFromBootstrap(); - c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations()); - } catch (Exception e) { - LOGGER.error("Failed to process c2 operations queue", e); - c2ClientService.enableHeartbeat(); - } - } else { - c2ClientService.enableHeartbeat(); - } - } - - private void waitForAcknowledgeFromBootstrap() { - LOGGER.info("Waiting for ACK signal from Bootstrap"); - int currentWaitTime = 0; - while (!ackReceived) { - try { - Thread.sleep(IS_ACK_RECEIVED_POLL_INTERVAL); - } catch (InterruptedException e) { - LOGGER.warn("Thread interrupted while waiting for Acknowledge"); - } - currentWaitTime += IS_ACK_RECEIVED_POLL_INTERVAL; - if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) { - LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS); - break; - } - } + operationManagerExecutorService.execute(c2OperationManager); + LOGGER.debug("Scheduling heartbeats with {} ms periodicity", heartbeatPeriod); + heartbeatManagerExecutorService.scheduleAtFixedRate(c2HeartbeatManager, INITIAL_HEARTBEAT_DELAY_MS, heartbeatPeriod, MILLISECONDS); } - private void registerOperation(C2Operation c2Operation) { + public void stop() { + heartbeatManagerExecutorService.shutdown(); try { - ackReceived = false; - registerAcknowledgeTimeoutTask(c2Operation); - String command = ofNullable(c2Operation.getOperand()) - .map(operand -> c2Operation.getOperation().name() + "_" + operand.name()) - .orElse(c2Operation.getOperation().name()); - bootstrapCommunicator.sendCommand(command); - } catch (IOException e) { - LOGGER.error("Failed to send operation to bootstrap", e); - throw new UncheckedIOException(e); - } - } - - private void registerAcknowledgeTimeoutTask(C2Operation c2Operation) { - bootstrapAcknowledgeExecutorService.schedule(() -> { - if (!ackReceived) { - LOGGER.info("Operation requiring restart is failed, and no restart/acknowledge is happened after {} seconds for {}. Handling remaining operations.", - MINIFI_RESTART_TIMEOUT_SECONDS, c2Operation); - handleOngoingOperations(); + if (!heartbeatManagerExecutorService.awaitTermination(TERMINATION_WAIT, MILLISECONDS)) { + heartbeatManagerExecutorService.shutdownNow(); } - }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } - - private void acknowledgeHandler(String[] params) { - LOGGER.info("Received acknowledge message from bootstrap process"); - if (params.length < 1) { - LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation"); - return; - } - - Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.load(); - ackReceived = true; - if (optionalOperationQueue.isPresent()) { - OperationQueue operationQueue = optionalOperationQueue.get(); - C2Operation c2Operation = operationQueue.getCurrentOperation(); - C2OperationAck c2OperationAck = new C2OperationAck(); - c2OperationAck.setOperationId(c2Operation.getIdentifier()); - C2OperationState c2OperationState = new C2OperationState(); - MiNiFiCommandState miNiFiCommandState = MiNiFiCommandState.valueOf(params[0]); - OperationState state = OPERATION_STATE_MAP.get(miNiFiCommandState); - c2OperationState.setState(state); - c2OperationAck.setOperationState(c2OperationState); - c2ClientService.sendAcknowledge(c2OperationAck); - if (MiNiFiCommandState.NO_OPERATION == miNiFiCommandState || MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART == miNiFiCommandState) { - LOGGER.debug("No restart happened because of an error / the app was already in the desired state"); - handleOngoingOperations(); - } - } else { - LOGGER.error("Can not send acknowledge due to empty Operation Queue"); + } catch (InterruptedException ignore) { + LOGGER.info("Stopping C2 heartbeat executor service was interrupted, forcing shutdown"); + heartbeatManagerExecutorService.shutdownNow(); } - } - - public void stop() { - bootstrapAcknowledgeExecutorService.shutdownNow(); - heartbeatExecutorService.shutdown(); + operationManagerExecutorService.shutdown(); try { - if (!heartbeatExecutorService.awaitTermination(TERMINATION_WAIT, TimeUnit.MILLISECONDS)) { - heartbeatExecutorService.shutdownNow(); + if (!operationManagerExecutorService.awaitTermination(TERMINATION_WAIT, MILLISECONDS)) { + operationManagerExecutorService.shutdownNow(); } } catch (InterruptedException ignore) { - LOGGER.info("Stopping C2 Client's thread was interrupted but shutting down anyway the C2NifiClientService"); - heartbeatExecutorService.shutdownNow(); + LOGGER.info("Stopping C2 operation executor service was interrupted, forcing shutdown"); + operationManagerExecutorService.shutdownNow(); } } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java similarity index 87% rename from minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java rename to minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java index 80b1418c95..72f4c55bbd 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java @@ -17,22 +17,25 @@ package org.apache.nifi.minifi.c2; +import static org.slf4j.LoggerFactory.getLogger; + import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; import java.util.Optional; import org.apache.nifi.c2.client.service.operation.OperationQueue; -import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO; +import org.apache.nifi.c2.client.service.operation.OperationQueueDAO; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class FileBasedRequestedOperationDAO implements RequestedOperationDAO { - private static final Logger LOGGER = LoggerFactory.getLogger(FileBasedRequestedOperationDAO.class); +public class FileBasedOperationQueueDAO implements OperationQueueDAO { + + private static final Logger LOGGER = getLogger(FileBasedOperationQueueDAO.class); + protected static final String REQUESTED_OPERATIONS_FILE_NAME = "requestedOperations.data"; private final ObjectMapper objectMapper; private final File requestedOperationsFile; - public FileBasedRequestedOperationDAO(String runDir, ObjectMapper objectMapper) { + public FileBasedOperationQueueDAO(String runDir, ObjectMapper objectMapper) { this.requestedOperationsFile = new File(runDir, REQUESTED_OPERATIONS_FILE_NAME); this.objectMapper = objectMapper; } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java new file mode 100644 index 0000000000..c78155fa9a --- /dev/null +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.c2; + +import static java.lang.Thread.sleep; +import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor; +import static org.apache.nifi.bootstrap.CommandResult.FAILURE; +import static org.apache.nifi.bootstrap.CommandResult.SUCCESS; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.OperationType.START; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import org.apache.nifi.bootstrap.BootstrapCommunicator; +import org.apache.nifi.bootstrap.CommandResult; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class BootstrapC2OperationRestartHandlerTest { + + @Test + void shouldReturnNotAppliedWhenBootstrapCommunicatorReturnsFalse() throws IOException { + C2Operation inputOperation = new C2Operation(); + inputOperation.setOperation(START); + BootstrapCommunicator bootstrapCommunicator = mock(BootstrapCommunicator.class); + when(bootstrapCommunicator.sendCommand(START.name())).thenReturn(FAILURE); + long bootstrapAcknowledgeTimeoutMs = 0; + + BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs); + Optional<OperationState> result = testHandler.handleRestart(inputOperation); + + assertTrue(result.isPresent()); + assertEquals(NOT_APPLIED, result.get()); + } + + @Test + void shouldReturnNotAppliedWhenBootstrapCommunicatorThrowsException() throws IOException { + C2Operation inputOperation = new C2Operation(); + inputOperation.setOperation(START); + BootstrapCommunicator bootstrapCommunicator = mock(BootstrapCommunicator.class); + when(bootstrapCommunicator.sendCommand(START.name())).thenThrow(new IOException()); + long bootstrapAcknowledgeTimeoutMs = 0; + + BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs); + Optional<OperationState> result = testHandler.handleRestart(inputOperation); + + assertTrue(result.isPresent()); + assertEquals(NOT_APPLIED, result.get()); + } + + @Test + void shouldReturnStateAcknowledgedByBootstrapCommunicator() { + C2Operation inputOperation = new C2Operation(); + inputOperation.setOperation(START); + long bootstrapAcknowledgeTimeoutMs = 1000; + long waitBeforeAcknowledgeMs = 100; + String[] callbackResult = new String[] {FULLY_APPLIED.name()}; + BootstrapCommunicatorStub bootstrapCommunicator = new BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs); + + BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs); + try (ExecutorService executorService = newVirtualThreadPerTaskExecutor()) { + executorService.execute(bootstrapCommunicator); + Optional<OperationState> result = testHandler.handleRestart(inputOperation); + + assertTrue(result.isPresent()); + assertEquals(FULLY_APPLIED, result.get()); + } + } + + @Test + void shouldReturnNotAppliedWhenBootstrapAcknowledgeTimesOut() { + C2Operation inputOperation = new C2Operation(); + inputOperation.setOperation(START); + String[] callbackResult = new String[] {FULLY_APPLIED.name()}; + long bootstrapAcknowledgeTimeoutMs = 1000; + long waitBeforeAcknowledgeMs = 2000; + BootstrapCommunicatorStub bootstrapCommunicator = new BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs); + + BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs); + try (ExecutorService executorService = newVirtualThreadPerTaskExecutor()) { + executorService.execute(bootstrapCommunicator); + Optional<OperationState> result = testHandler.handleRestart(inputOperation); + + assertTrue(result.isPresent()); + assertEquals(NOT_APPLIED, result.get()); + } + } + + @Test + void shouldReturnNotAppliedWhenBootstrapSendInvalidResponse() { + C2Operation inputOperation = new C2Operation(); + inputOperation.setOperation(START); + String[] callbackResult = new String[] {}; + long bootstrapAcknowledgeTimeoutMs = 1000; + long waitBeforeAcknowledgeMs = 100; + BootstrapCommunicatorStub bootstrapCommunicator = new BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs); + + BootstrapC2OperationRestartHandler testHandler = new BootstrapC2OperationRestartHandler(bootstrapCommunicator, bootstrapAcknowledgeTimeoutMs); + try (ExecutorService executorService = newVirtualThreadPerTaskExecutor()) { + executorService.execute(bootstrapCommunicator); + Optional<OperationState> result = testHandler.handleRestart(inputOperation); + + assertTrue(result.isPresent()); + assertEquals(NOT_APPLIED, result.get()); + } + } + + static class BootstrapCommunicatorStub implements BootstrapCommunicator, Runnable { + + private final CommandResult sendCommandResult; + private final String[] callbackResult; + private final long waitBeforeAcknowledgeMs; + private BiConsumer<String[], OutputStream> handler; + + BootstrapCommunicatorStub(CommandResult sendCommandResult, String[] callbackResult, long waitBeforeAcknowledgeMs) { + this.sendCommandResult = sendCommandResult; + this.callbackResult = callbackResult; + this.waitBeforeAcknowledgeMs = waitBeforeAcknowledgeMs; + } + + @Override + public void run() { + try { + sleep(waitBeforeAcknowledgeMs); + } catch (InterruptedException ignore) { + } + handler.accept(callbackResult, null); + } + + @Override + public CommandResult sendCommand(String command, String... args) { + return sendCommandResult; + } + + @Override + public void registerMessageHandler(String command, BiConsumer<String[], OutputStream> handler) { + this.handler = handler; + } + } + +} + + diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java similarity index 90% rename from minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java rename to minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java index 962e98b2e9..b2ae03cd03 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java @@ -17,7 +17,7 @@ package org.apache.nifi.minifi.c2; -import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME; +import static org.apache.nifi.minifi.c2.FileBasedOperationQueueDAO.REQUESTED_OPERATIONS_FILE_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Optional; import org.apache.nifi.c2.client.service.operation.OperationQueue; import org.apache.nifi.c2.protocol.api.C2Operation; @@ -45,7 +46,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class FileBasedRequestedOperationDAOTest { +class FileBasedOperationQueueDAOTest { @Mock private ObjectMapper objectMapper; @@ -53,11 +54,11 @@ class FileBasedRequestedOperationDAOTest { @TempDir File tmpDir; - private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO; + private FileBasedOperationQueueDAO fileBasedRequestedOperationDAO; @BeforeEach void setup() { - fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper); + fileBasedRequestedOperationDAO = new FileBasedOperationQueueDAO(tmpDir.getAbsolutePath(), objectMapper); } @Test @@ -109,6 +110,6 @@ class FileBasedRequestedOperationDAOTest { C2Operation currentOperation = new C2Operation(); currentOperation.setIdentifier("id2"); - return new OperationQueue(currentOperation, Collections.singletonList(c2Operation)); + return new OperationQueue(currentOperation, List.of(c2Operation)); } } \ No newline at end of file diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java index 911ddc44a7..fa6b5db211 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java @@ -14,8 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.minifi.bootstrap; +import static org.apache.nifi.bootstrap.CommandResult.FAILURE; +import static org.apache.nifi.bootstrap.CommandResult.SUCCESS; + import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.BufferedReader; @@ -38,6 +42,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.BiConsumer; import org.apache.nifi.bootstrap.BootstrapCommunicator; +import org.apache.nifi.bootstrap.CommandResult; import org.apache.nifi.minifi.MiNiFiServer; import org.apache.nifi.minifi.commons.status.FlowStatusReport; import org.apache.nifi.minifi.status.StatusRequestException; @@ -90,29 +95,29 @@ public class BootstrapListener implements BootstrapCommunicator { listenThread.start(); logger.debug("Notifying Bootstrap that local port is {}", localPort); - sendCommand("PORT", new String[]{String.valueOf(localPort), secretKey}); + sendCommand("PORT", new String[] {String.valueOf(localPort), secretKey}); } public void reload() throws IOException { if (listener != null) { listener.stop(); } - sendCommand(RELOAD, new String[]{}); + sendCommand(RELOAD, new String[] {}); } public void stop() throws IOException { if (listener != null) { listener.stop(); } - sendCommand(SHUTDOWN, new String[]{}); + sendCommand(SHUTDOWN, new String[] {}); } public void sendStartedStatus(boolean status) throws IOException { logger.debug("Notifying Bootstrap that the status of starting MiNiFi is {}", status); - sendCommand(STARTED, new String[]{String.valueOf(status)}); + sendCommand(STARTED, new String[] {String.valueOf(status)}); } - public void sendCommand(String command, String[] args) throws IOException { + public CommandResult sendCommand(String command, String[] args) throws IOException { try (Socket socket = new Socket()) { socket.setSoTimeout(60000); socket.connect(new InetSocketAddress("localhost", bootstrapPort)); @@ -134,8 +139,10 @@ public class BootstrapListener implements BootstrapCommunicator { String response = reader.readLine(); if ("OK".equals(response)) { logger.info("Successfully initiated communication with Bootstrap"); + return SUCCESS; } else { logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from MiNiFi"); + return FAILURE; } } } diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java index 41784233c7..1bfc1563d9 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java @@ -25,11 +25,13 @@ public interface BootstrapCommunicator { /** * Sends a command with specific arguments to the bootstrap process + * * @param command the command to send - * @param args the args to send + * @param args the args to send + * @return {@link CommandResult} of the command sent to Bootstrap * @throws IOException exception in case of communication issue */ - void sendCommand(String command, String... args) throws IOException; + CommandResult sendCommand(String command, String... args) throws IOException; /** * Register a handler for messages coming from bootstrap process diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java similarity index 55% rename from c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java rename to nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java index 1216aa812d..ebb9bde6d3 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java @@ -15,31 +15,9 @@ * limitations under the License. */ -package org.apache.nifi.c2.client.service.operation; - -import java.util.Optional; - -/** - * The purpose of this interface is to be able to persist operations between restarts. - */ -public interface RequestedOperationDAO { - - /** - * Persist the given requested operation list - * @param operationQueue the queue containing the current and remaining operations - */ - void save(OperationQueue operationQueue); - - /** - * Returns the saved Operations - * - * @return the C2 Operations queue with the actual operation - */ - Optional<OperationQueue> load(); - - /** - * Resets the saved operations - */ - void cleanup(); +package org.apache.nifi.bootstrap; +public enum CommandResult { + FAILURE, + SUCCESS }