This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 36ecf43a300a274d5cf3b4468e29ac91dc10332d
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Thu Nov 4 14:57:57 2021 -0400

    NIFI-9365: Changed HashMap to ConcurrentHashMap in StandardProcessorNode 
for the activeThreads, because we have a need to iterate over it outside of 
synchronized keyword
---
 .../nifi/controller/StandardProcessorNode.java     |   3 +-
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |   2 +-
 .../nifi/tests/system/clustering/OffloadIT.java    | 136 +++++++++++++++++++++
 3 files changed, 139 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index a23fd0e..0e47331 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -96,6 +96,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -152,7 +153,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     private SchedulingStrategy schedulingStrategy; // guarded by synchronized 
keyword
     private ExecutionNode executionNode;
-    private final Map<Thread, ActiveTask> activeThreads = new HashMap<>(48);
+    private final Map<Thread, ActiveTask> activeThreads = new 
ConcurrentHashMap<>(48);
     private final int hashCode;
     private volatile boolean hasActiveThreads = false;
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 23e3a6f..d94529d 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -273,7 +273,7 @@ public abstract class NiFiSystemIT {
 
         waitForQueueCountToMatch(connectionId, size -> size > 0, "greater than 
0");
 
-        logger.info("Waiting for Queue on Connection {} is not empty", 
connectionId);
+        logger.info("Queue on Connection {} is not empty", connectionId);
     }
 
     protected void waitForQueueCount(final String connectionId, final int 
queueSize) throws InterruptedException {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
new file mode 100644
index 0000000..1e66fc3
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.clustering;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.entity.ClusterEntity;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+public class OffloadIT extends NiFiSystemIT {
+    private static final Logger logger = 
LoggerFactory.getLogger(OffloadIT.class);
+
+    @Override
+    protected NiFiInstanceFactory getInstanceFactory() {
+        return new SpawnedClusterNiFiInstanceFactory(
+            "src/test/resources/conf/clustered/node1/bootstrap.conf",
+            "src/test/resources/conf/clustered/node2/bootstrap.conf");
+    }
+
+    @Test
+    public void testOffload() throws InterruptedException, IOException, 
NiFiClientException {
+        for (int i=0; i < 5; i++) {
+            logger.info("Running iteration {}", i);
+            testIteration();
+            logger.info("Node reconnected to cluster");
+            destroyFlow();
+        }
+    }
+
+    private void testIteration() throws NiFiClientException, IOException, 
InterruptedException {
+        ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        ProcessorEntity sleep = getClientUtil().createProcessor("Sleep");
+        ConnectionEntity connectionEntity = 
getClientUtil().createConnection(generate, sleep, "success");
+
+        getClientUtil().setAutoTerminatedRelationships(sleep, "success");
+        generate = getClientUtil().updateProcessorProperties(generate, 
Collections.singletonMap("File Size", "1 KB"));
+        final ProcessorConfigDTO configDto = 
generate.getComponent().getConfig();
+        configDto.setSchedulingPeriod("0 sec");
+        getClientUtil().updateProcessorConfig(generate, configDto);
+
+        getClientUtil().updateProcessorProperties(sleep, 
Collections.singletonMap("onTrigger Sleep Time", "100 ms"));
+
+
+        getClientUtil().startProcessGroupComponents("root");
+
+        waitForQueueNotEmpty(connectionEntity.getId());
+
+        final NodeDTO node2Dto = getNodeDTO(5672);
+
+        disconnectNode(node2Dto);
+
+        final String nodeId = node2Dto.getNodeId();
+        getClientUtil().offloadNode(nodeId);
+        waitFor(this::isNodeOffloaded);
+
+        getClientUtil().connectNode(nodeId);
+        waitForAllNodesConnected();
+    }
+
+    private boolean isNodeOffloaded() {
+        final ClusterEntity clusterEntity;
+        try {
+            clusterEntity = getNifiClient().getControllerClient().getNodes();
+        } catch (final Exception e) {
+            logger.error("Failed to determine if node is offloaded", e);
+            return false;
+        }
+
+        final Collection<NodeDTO> nodeDtos = 
clusterEntity.getCluster().getNodes();
+
+        for (final NodeDTO dto : nodeDtos) {
+            final String status = dto.getStatus();
+            if (status.equalsIgnoreCase("OFFLOADED")) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException, 
IOException {
+        final ClusterEntity clusterEntity = 
getNifiClient().getControllerClient().getNodes();
+        final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
+            .filter(nodeDto -> nodeDto.getApiPort() == apiPort)
+            .findAny()
+            .orElseThrow(() -> new RuntimeException("Could not locate Node 
2"));
+
+        return node2Dto;
+    }
+
+
+    private void disconnectNode(final NodeDTO nodeDto) throws 
NiFiClientException, IOException, InterruptedException {
+        getClientUtil().disconnectNode(nodeDto.getNodeId());
+
+        final Integer apiPort = nodeDto.getApiPort();
+        waitFor(() -> {
+            try {
+                final NodeDTO dto = getNodeDTO(apiPort);
+                final String status = dto.getStatus();
+                return "DISCONNECTED".equals(status);
+            } catch (final Exception e) {
+                logger.error("Failed to determine if node is disconnected", e);
+            }
+
+            return false;
+        });
+    }
+
+}

Reply via email to