This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65b2a9b  NIFI-7117: When SocketLoadBalancedFlowFileQueue creates its 
array of Queue Partitions in the constructor, it added the local partition as 
the first element in that list. This list should be ordered the same across all 
nodes in the cluster. By making the local partition the first in the array, 
each node had a different ordering of these partitions. As a result, Partition 
by Attribute strategy would constantly rebalance flowfiles that it received to 
other node, and Single  [...]
65b2a9b is described below

commit 65b2a9bc2c91657bc160fe93d6051da95ab5db34
Author: Mark Payne <[email protected]>
AuthorDate: Mon Feb 10 14:15:46 2020 -0500

    NIFI-7117: When SocketLoadBalancedFlowFileQueue creates its array of Queue 
Partitions in the constructor, it added the local partition as the first 
element in that list. This list should be ordered the same across all nodes in 
the cluster. By making the local partition the first in the array, each node 
had a different ordering of these partitions. As a result, Partition by 
Attribute strategy would constantly rebalance flowfiles that it received to 
other node, and Single Node always tr [...]
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4045.
---
 .../clustered/SocketLoadBalancedFlowFileQueue.java | 10 ++--
 .../partition/CorrelationAttributePartitioner.java | 17 ++++++
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  5 +-
 .../tests/system/loadbalance/LoadBalanceIT.java    | 61 ++++++++++++++++++++++
 .../cli/impl/client/nifi/ConnectionClient.java     |  2 +
 .../client/nifi/impl/JerseyConnectionClient.java   | 13 ++++-
 6 files changed, 103 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index e69daad..06c86d1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -151,15 +151,19 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
             // that is not the local node identifier. If the Local Node 
Identifier is not yet known, that's okay. When it becomes known,
             // the queuePartitions array will be recreated with the 
appropriate partitions.
             final List<QueuePartition> partitionList = new ArrayList<>();
-            partitionList.add(localPartition);
 
             final NodeIdentifier localNodeId = 
clusterCoordinator.getLocalNodeIdentifier();
             for (final NodeIdentifier nodeId : sortedNodeIdentifiers) {
                 if (nodeId.equals(localNodeId)) {
-                    continue;
+                    partitionList.add(localPartition);
+                } else {
+                    partitionList.add(createRemotePartition(nodeId));
                 }
+            }
 
-                partitionList.add(createRemotePartition(nodeId));
+            // Ensure that our list of queue partitions always contains the 
local partition.
+            if (!partitionList.contains(localPartition)) {
+                partitionList.add(localPartition);
             }
 
             queuePartitions = partitionList.toArray(new QueuePartition[0]);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
index 7529098..70172ca 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
@@ -19,8 +19,16 @@ package org.apache.nifi.controller.queue.clustered.partition;
 
 import com.google.common.hash.Hashing;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
 
 public class CorrelationAttributePartitioner implements FlowFilePartitioner {
+    private static final Logger logger = 
LoggerFactory.getLogger(CorrelationAttributePartitioner.class);
+
     private final String partitioningAttribute;
 
     public CorrelationAttributePartitioner(final String partitioningAttribute) 
{
@@ -41,6 +49,15 @@ public class CorrelationAttributePartitioner implements 
FlowFilePartitioner {
             index = Hashing.consistentHash(hash, partitions.length);
         }
 
+        if (logger.isDebugEnabled()) {
+            final List<String> partitionDescriptions = new 
ArrayList<>(partitions.length);
+            for (final QueuePartition partition : partitions) {
+                partitionDescriptions.add(partition.getSwapPartitionName());
+            }
+
+            logger.debug("Assigning Partition {} to {} based on {}", index, 
flowFile.getAttribute(CoreAttributes.UUID.key()), partitionDescriptions);
+        }
+
         return partitions[index];
     }
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 03e14c6..cd268b0 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -691,8 +691,11 @@ public class NiFiClientUtil {
 
         final FlowFileSummaryDTO flowFileSummary = 
flowFileSummaries.get(flowFileIndex);
         final String uuid = flowFileSummary.getUuid();
+        final String nodeId = flowFileSummary.getClusterNodeId();
 
-        return nifiClient.getConnectionClient().getFlowFile(connectionId, 
uuid);
+        final FlowFileEntity flowFileEntity = 
nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId);
+        flowFileEntity.getFlowFile().setClusterNodeId(nodeId);
+        return flowFileEntity;
     }
 
     public VariableRegistryUpdateRequestEntity updateVariableRegistry(final 
ProcessGroupEntity processGroup, final Map<String, String> variables) throws 
NiFiClientException, IOException {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
index 2a8b698..a0e93aa 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
@@ -31,6 +31,7 @@ import 
org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
 import org.apache.nifi.web.api.entity.ClusterEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,8 +39,10 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
+import java.util.Set;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -158,6 +161,64 @@ public class LoadBalanceIT extends NiFiSystemIT {
     }
 
     @Test
+    public void testPartitionByAttribute() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        final ProcessorEntity count = 
getClientUtil().createProcessor("CountEvents");
+
+        final ConnectionEntity connection = 
getClientUtil().createConnection(generate, count, "success");
+        getClientUtil().setAutoTerminatedRelationships(count, "success");
+
+        // Configure Processor to generate 10 FlowFiles, each 1 MB, on each 
node, for a total of 20 FlowFiles.
+        final Map<String, String> generateProperties = new HashMap<>();
+        generateProperties.put("File Size", "1 MB");
+        generateProperties.put("Batch Size", "10");
+        generateProperties.put("number", "0");
+        getClientUtil().updateProcessorProperties(generate, 
generateProperties);
+        getClientUtil().updateProcessorExecutionNode(generate, 
ExecutionNode.PRIMARY);
+
+        // Round Robin between nodes. This should result in 10 FlowFiles on 
each node.
+        getClientUtil().updateConnectionLoadBalancing(connection, 
LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE, 
LoadBalanceCompression.DO_NOT_COMPRESS, "number");
+
+        // Queue 100 FlowFiles. 10 with number=0, 10 with number=1, 10 with 
number=2, etc. to up 10 with number=9
+        for (int i=1; i <= 10; i++) {
+            // Generate the data.
+            getNifiClient().getProcessorClient().startProcessor(generate);
+
+            final int expectedQueueSize = 10 * i;
+
+            // Wait until all 10 FlowFiles are queued up.
+            waitFor(() -> {
+                final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connection.getId());
+                return 
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() 
== expectedQueueSize;
+            });
+
+
+            getNifiClient().getProcessorClient().stopProcessor(generate);
+            getClientUtil().waitForStoppedProcessor(generate.getId());
+
+            generateProperties.put("number", String.valueOf(i));
+            getClientUtil().updateProcessorProperties(generate, 
generateProperties);
+        }
+
+        // Wait until load balancing is complete
+        waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+
+        final Map<String, Set<String>> nodesByAttribute = new HashMap<>();
+        for (int i=0; i < 100; i++) {
+            final FlowFileEntity flowFile = 
getClientUtil().getQueueFlowFile(connection.getId(), i);
+            final String numberValue = 
flowFile.getFlowFile().getAttributes().get("number");
+            final Set<String> nodes = 
nodesByAttribute.computeIfAbsent(numberValue, key -> new HashSet<>());
+            nodes.add(flowFile.getFlowFile().getClusterNodeId());
+        }
+
+        assertEquals(10, nodesByAttribute.size());
+        for (final Map.Entry<String, Set<String>> entry : 
nodesByAttribute.entrySet()) {
+            final Set<String> nodes = entry.getValue();
+            assertEquals("FlowFile with attribute number=" + entry.getKey() + 
" went to nodes " + nodes, 1, nodes.size());
+        }
+    }
+
+    @Test
     public void testOffload() throws NiFiClientException, IOException, 
InterruptedException {
         final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
         final ProcessorEntity count = 
getClientUtil().createProcessor("CountEvents");
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
index a486a65..f5c07d1 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
@@ -47,4 +47,6 @@ public interface ConnectionClient {
     ListingRequestEntity deleteListingRequest(String connectionId, String 
listingRequestId) throws NiFiClientException, IOException;
 
     FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) 
throws NiFiClientException, IOException;
+
+    FlowFileEntity getFlowFile(String connectionId, String flowFileUuid, 
String nodeId) throws NiFiClientException, IOException;
 }
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
index f4180fe..ffe038e 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
@@ -242,8 +242,15 @@ public class JerseyConnectionClient extends 
AbstractJerseyClient implements Conn
         });
     }
 
+
     @Override
     public FlowFileEntity getFlowFile(final String connectionId, final String 
flowFileUuid) throws NiFiClientException, IOException {
+        return getFlowFile(connectionId, flowFileUuid, null);
+    }
+
+
+    @Override
+    public FlowFileEntity getFlowFile(final String connectionId, final String 
flowFileUuid, final String nodeId) throws NiFiClientException, IOException {
         if (connectionId == null) {
             throw new IllegalArgumentException("Connection ID cannot be null");
         }
@@ -252,11 +259,15 @@ public class JerseyConnectionClient extends 
AbstractJerseyClient implements Conn
         }
 
         return executeAction("Error retrieving FlowFile", () -> {
-            final WebTarget target = flowFileQueueTarget
+            WebTarget target = flowFileQueueTarget
                 .path("flowfiles/{uuid}")
                 .resolveTemplate("id", connectionId)
                 .resolveTemplate("uuid", flowFileUuid);
 
+            if (nodeId != null) {
+                target = target.queryParam("clusterNodeId", nodeId);
+            }
+
             return getRequestBuilder(target).get(FlowFileEntity.class);
         });
     }

Reply via email to