This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 65b2a9b NIFI-7117: When SocketLoadBalancedFlowFileQueue creates its
array of Queue Partitions in the constructor, it added the local partition as
the first element in that list. This list should be ordered the same across all
nodes in the cluster. By making the local partition the first in the array,
each node had a different ordering of these partitions. As a result, Partition
by Attribute strategy would constantly rebalance flowfiles that it received to
other node, and Single [...]
65b2a9b is described below
commit 65b2a9bc2c91657bc160fe93d6051da95ab5db34
Author: Mark Payne <[email protected]>
AuthorDate: Mon Feb 10 14:15:46 2020 -0500
NIFI-7117: When SocketLoadBalancedFlowFileQueue creates its array of Queue
Partitions in the constructor, it added the local partition as the first
element in that list. This list should be ordered the same across all nodes in
the cluster. By making the local partition the first in the array, each node
had a different ordering of these partitions. As a result, Partition by
Attribute strategy would constantly rebalance flowfiles that it received to
other node, and Single Node always tr [...]
Signed-off-by: Pierre Villard <[email protected]>
This closes #4045.
---
.../clustered/SocketLoadBalancedFlowFileQueue.java | 10 ++--
.../partition/CorrelationAttributePartitioner.java | 17 ++++++
.../apache/nifi/tests/system/NiFiClientUtil.java | 5 +-
.../tests/system/loadbalance/LoadBalanceIT.java | 61 ++++++++++++++++++++++
.../cli/impl/client/nifi/ConnectionClient.java | 2 +
.../client/nifi/impl/JerseyConnectionClient.java | 13 ++++-
6 files changed, 103 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index e69daad..06c86d1 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -151,15 +151,19 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
// that is not the local node identifier. If the Local Node
Identifier is not yet known, that's okay. When it becomes known,
// the queuePartitions array will be recreated with the
appropriate partitions.
final List<QueuePartition> partitionList = new ArrayList<>();
- partitionList.add(localPartition);
final NodeIdentifier localNodeId =
clusterCoordinator.getLocalNodeIdentifier();
for (final NodeIdentifier nodeId : sortedNodeIdentifiers) {
if (nodeId.equals(localNodeId)) {
- continue;
+ partitionList.add(localPartition);
+ } else {
+ partitionList.add(createRemotePartition(nodeId));
}
+ }
- partitionList.add(createRemotePartition(nodeId));
+ // Ensure that our list of queue partitions always contains the
local partition.
+ if (!partitionList.contains(localPartition)) {
+ partitionList.add(localPartition);
}
queuePartitions = partitionList.toArray(new QueuePartition[0]);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
index 7529098..70172ca 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
@@ -19,8 +19,16 @@ package org.apache.nifi.controller.queue.clustered.partition;
import com.google.common.hash.Hashing;
import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
public class CorrelationAttributePartitioner implements FlowFilePartitioner {
+ private static final Logger logger =
LoggerFactory.getLogger(CorrelationAttributePartitioner.class);
+
private final String partitioningAttribute;
public CorrelationAttributePartitioner(final String partitioningAttribute)
{
@@ -41,6 +49,15 @@ public class CorrelationAttributePartitioner implements
FlowFilePartitioner {
index = Hashing.consistentHash(hash, partitions.length);
}
+ if (logger.isDebugEnabled()) {
+ final List<String> partitionDescriptions = new
ArrayList<>(partitions.length);
+ for (final QueuePartition partition : partitions) {
+ partitionDescriptions.add(partition.getSwapPartitionName());
+ }
+
+ logger.debug("Assigning Partition {} to {} based on {}", index,
flowFile.getAttribute(CoreAttributes.UUID.key()), partitionDescriptions);
+ }
+
return partitions[index];
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 03e14c6..cd268b0 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -691,8 +691,11 @@ public class NiFiClientUtil {
final FlowFileSummaryDTO flowFileSummary =
flowFileSummaries.get(flowFileIndex);
final String uuid = flowFileSummary.getUuid();
+ final String nodeId = flowFileSummary.getClusterNodeId();
- return nifiClient.getConnectionClient().getFlowFile(connectionId,
uuid);
+ final FlowFileEntity flowFileEntity =
nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId);
+ flowFileEntity.getFlowFile().setClusterNodeId(nodeId);
+ return flowFileEntity;
}
public VariableRegistryUpdateRequestEntity updateVariableRegistry(final
ProcessGroupEntity processGroup, final Map<String, String> variables) throws
NiFiClientException, IOException {
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
index 2a8b698..a0e93aa 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
@@ -31,6 +31,7 @@ import
org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Assert;
import org.junit.Test;
@@ -38,8 +39,10 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LongSummaryStatistics;
import java.util.Map;
+import java.util.Set;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
@@ -158,6 +161,64 @@ public class LoadBalanceIT extends NiFiSystemIT {
}
@Test
+ public void testPartitionByAttribute() throws NiFiClientException,
IOException, InterruptedException {
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ final ProcessorEntity count =
getClientUtil().createProcessor("CountEvents");
+
+ final ConnectionEntity connection =
getClientUtil().createConnection(generate, count, "success");
+ getClientUtil().setAutoTerminatedRelationships(count, "success");
+
+ // Configure Processor to generate 10 FlowFiles, each 1 MB, on each
node, for a total of 20 FlowFiles.
+ final Map<String, String> generateProperties = new HashMap<>();
+ generateProperties.put("File Size", "1 MB");
+ generateProperties.put("Batch Size", "10");
+ generateProperties.put("number", "0");
+ getClientUtil().updateProcessorProperties(generate,
generateProperties);
+ getClientUtil().updateProcessorExecutionNode(generate,
ExecutionNode.PRIMARY);
+
+ // Round Robin between nodes. This should result in 10 FlowFiles on
each node.
+ getClientUtil().updateConnectionLoadBalancing(connection,
LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE,
LoadBalanceCompression.DO_NOT_COMPRESS, "number");
+
+ // Queue 100 FlowFiles. 10 with number=0, 10 with number=1, 10 with
number=2, etc. to up 10 with number=9
+ for (int i=1; i <= 10; i++) {
+ // Generate the data.
+ getNifiClient().getProcessorClient().startProcessor(generate);
+
+ final int expectedQueueSize = 10 * i;
+
+ // Wait until all 10 FlowFiles are queued up.
+ waitFor(() -> {
+ final ConnectionStatusEntity statusEntity =
getConnectionStatus(connection.getId());
+ return
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued()
== expectedQueueSize;
+ });
+
+
+ getNifiClient().getProcessorClient().stopProcessor(generate);
+ getClientUtil().waitForStoppedProcessor(generate.getId());
+
+ generateProperties.put("number", String.valueOf(i));
+ getClientUtil().updateProcessorProperties(generate,
generateProperties);
+ }
+
+ // Wait until load balancing is complete
+ waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+
+ final Map<String, Set<String>> nodesByAttribute = new HashMap<>();
+ for (int i=0; i < 100; i++) {
+ final FlowFileEntity flowFile =
getClientUtil().getQueueFlowFile(connection.getId(), i);
+ final String numberValue =
flowFile.getFlowFile().getAttributes().get("number");
+ final Set<String> nodes =
nodesByAttribute.computeIfAbsent(numberValue, key -> new HashSet<>());
+ nodes.add(flowFile.getFlowFile().getClusterNodeId());
+ }
+
+ assertEquals(10, nodesByAttribute.size());
+ for (final Map.Entry<String, Set<String>> entry :
nodesByAttribute.entrySet()) {
+ final Set<String> nodes = entry.getValue();
+ assertEquals("FlowFile with attribute number=" + entry.getKey() +
" went to nodes " + nodes, 1, nodes.size());
+ }
+ }
+
+ @Test
public void testOffload() throws NiFiClientException, IOException,
InterruptedException {
final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity count =
getClientUtil().createProcessor("CountEvents");
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
index a486a65..f5c07d1 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
@@ -47,4 +47,6 @@ public interface ConnectionClient {
ListingRequestEntity deleteListingRequest(String connectionId, String
listingRequestId) throws NiFiClientException, IOException;
FlowFileEntity getFlowFile(String connectionId, String flowFileUuid)
throws NiFiClientException, IOException;
+
+ FlowFileEntity getFlowFile(String connectionId, String flowFileUuid,
String nodeId) throws NiFiClientException, IOException;
}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
index f4180fe..ffe038e 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
@@ -242,8 +242,15 @@ public class JerseyConnectionClient extends
AbstractJerseyClient implements Conn
});
}
+
@Override
public FlowFileEntity getFlowFile(final String connectionId, final String
flowFileUuid) throws NiFiClientException, IOException {
+ return getFlowFile(connectionId, flowFileUuid, null);
+ }
+
+
+ @Override
+ public FlowFileEntity getFlowFile(final String connectionId, final String
flowFileUuid, final String nodeId) throws NiFiClientException, IOException {
if (connectionId == null) {
throw new IllegalArgumentException("Connection ID cannot be null");
}
@@ -252,11 +259,15 @@ public class JerseyConnectionClient extends
AbstractJerseyClient implements Conn
}
return executeAction("Error retrieving FlowFile", () -> {
- final WebTarget target = flowFileQueueTarget
+ WebTarget target = flowFileQueueTarget
.path("flowfiles/{uuid}")
.resolveTemplate("id", connectionId)
.resolveTemplate("uuid", flowFileUuid);
+ if (nodeId != null) {
+ target = target.queryParam("clusterNodeId", nodeId);
+ }
+
return getRequestBuilder(target).get(FlowFileEntity.class);
});
}