[ 
https://issues.apache.org/jira/browse/NIFI-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16630785#comment-16630785
 ] 

ASF GitHub Bot commented on NIFI-5516:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2947#discussion_r221013395
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
 ---
    @@ -0,0 +1,337 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.queue.clustered.partition;
    +
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.controller.queue.DropFlowFileRequest;
    +import org.apache.nifi.controller.queue.FlowFileQueueContents;
    +import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
    +import org.apache.nifi.controller.queue.QueueSize;
    +import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
    +import 
org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics;
    +import org.apache.nifi.controller.queue.SwappablePriorityQueue;
    +import 
org.apache.nifi.controller.queue.clustered.TransferFailureDestination;
    +import 
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
    +import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
    +import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
    +import org.apache.nifi.controller.repository.ContentNotFoundException;
    +import org.apache.nifi.controller.repository.ContentRepository;
    +import org.apache.nifi.controller.repository.FlowFileRecord;
    +import org.apache.nifi.controller.repository.FlowFileRepository;
    +import org.apache.nifi.controller.repository.RepositoryRecord;
    +import org.apache.nifi.controller.repository.StandardRepositoryRecord;
    +import org.apache.nifi.controller.repository.SwapSummary;
    +import org.apache.nifi.controller.repository.claim.ContentClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaim;
    +import org.apache.nifi.flowfile.FlowFilePrioritizer;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.provenance.ProvenanceEventBuilder;
    +import org.apache.nifi.provenance.ProvenanceEventRecord;
    +import org.apache.nifi.provenance.ProvenanceEventRepository;
    +import org.apache.nifi.provenance.ProvenanceEventType;
    +import org.apache.nifi.provenance.StandardProvenanceEventRecord;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +/**
    + * A Queue Partition that is responsible for transferring FlowFiles to 
another node in the cluster
    + */
    +public class RemoteQueuePartition implements QueuePartition {
    +    private static final Logger logger = 
LoggerFactory.getLogger(RemoteQueuePartition.class);
    +
    +    private final NodeIdentifier nodeIdentifier;
    +    private final SwappablePriorityQueue priorityQueue;
    +    private final LoadBalancedFlowFileQueue flowFileQueue;
    +    private final TransferFailureDestination failureDestination;
    +
    +    private final FlowFileRepository flowFileRepo;
    +    private final ProvenanceEventRepository provRepo;
    +    private final ContentRepository contentRepo;
    +    private final AsyncLoadBalanceClientRegistry clientRegistry;
    +
    +    private boolean running = false;
    +    private final String description;
    +
    +    public RemoteQueuePartition(final NodeIdentifier nodeId, final 
SwappablePriorityQueue priorityQueue, final TransferFailureDestination 
failureDestination,
    +                                final FlowFileRepository flowFileRepo, 
final ProvenanceEventRepository provRepo, final ContentRepository 
contentRepository,
    +                                final AsyncLoadBalanceClientRegistry 
clientRegistry, final LoadBalancedFlowFileQueue flowFileQueue) {
    +
    +        this.nodeIdentifier = nodeId;
    +        this.priorityQueue = priorityQueue;
    +        this.flowFileQueue = flowFileQueue;
    +        this.failureDestination = failureDestination;
    +        this.flowFileRepo = flowFileRepo;
    +        this.provRepo = provRepo;
    +        this.contentRepo = contentRepository;
    +        this.clientRegistry = clientRegistry;
    +        this.description = "RemoteQueuePartition[queueId=" + 
flowFileQueue.getIdentifier() + ", nodeId=" + nodeIdentifier + "]";
    +    }
    +
    +    @Override
    +    public QueueSize size() {
    +        return priorityQueue.size();
    +    }
    +
    +    @Override
    +    public String getSwapPartitionName() {
    +        return nodeIdentifier.getId();
    +    }
    +
    +    @Override
    +    public Optional<NodeIdentifier> getNodeIdentifier() {
    +        return Optional.ofNullable(nodeIdentifier);
    +    }
    +
    +    @Override
    +    public void put(final FlowFileRecord flowFile) {
    +        priorityQueue.put(flowFile);
    +    }
    +
    +    @Override
    +    public void putAll(final Collection<FlowFileRecord> flowFiles) {
    +        priorityQueue.putAll(flowFiles);
    +    }
    +
    +    @Override
    +    public void dropFlowFiles(final DropFlowFileRequest dropRequest, final 
String requestor) {
    +        priorityQueue.dropFlowFiles(dropRequest, requestor);
    +    }
    +
    +    @Override
    +    public SwapSummary recoverSwappedFlowFiles() {
    +        return priorityQueue.recoverSwappedFlowFiles();
    +    }
    +
    +    @Override
    +    public FlowFileQueueContents packageForRebalance(String 
newPartitionName) {
    +        return priorityQueue.packageForRebalance(newPartitionName);
    +    }
    +
    +    @Override
    +    public void setPriorities(final List<FlowFilePrioritizer> 
newPriorities) {
    +        priorityQueue.setPriorities(newPriorities);
    +    }
    +
    +    private FlowFileRecord getFlowFile() {
    +        final Set<FlowFileRecord> expired = new HashSet<>();
    +        final FlowFileRecord flowFile = priorityQueue.poll(expired, 
flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS));
    +        flowFileQueue.handleExpiredRecords(expired);
    +        return flowFile;
    +    }
    +
    +    @Override
    +    public synchronized void start(final FlowFilePartitioner partitioner) {
    +        if (running) {
    +            return;
    +        }
    +
    +        final TransactionFailureCallback failureCallback = new 
TransactionFailureCallback() {
    +            @Override
    +            public void onTransactionFailed(final List<FlowFileRecord> 
flowFiles, final Exception cause, final TransactionPhase phase) {
    +                if (cause instanceof ContentNotFoundException) {
    +                    // Handle ContentNotFound by creating a 
RepositoryRecord for the FlowFile and marking as aborted, then updating the
    +                    // FlowFiles and Provenance Repositories accordingly. 
This follows the same pattern as StandardProcessSession so that
    +                    // we have a consistent way of handling this case.
    +                    final Optional<FlowFileRecord> optionalFlowFile = 
((ContentNotFoundException) cause).getFlowFile();
    +                    if (optionalFlowFile.isPresent()) {
    +                        final List<FlowFileRecord> successfulFlowFiles = 
new ArrayList<>(flowFiles);
    +
    +                        final FlowFileRecord flowFile = 
optionalFlowFile.get();
    +                        successfulFlowFiles.remove(flowFile);
    +
    +                        final StandardRepositoryRecord repoRecord = new 
StandardRepositoryRecord(flowFileQueue, flowFile);
    +                        repoRecord.markForAbort();
    +
    +                        updateRepositories(Collections.emptyList(), 
Collections.singleton(repoRecord));
    +
    +                        // If unable to even connect to the node, go ahead 
and transfer all FlowFiles for this queue to the failure destination.
    +                        // Otherwise, transfer just those FlowFiles that 
we failed to send.
    +                        if (phase == TransactionPhase.CONNECTING) {
    +                            
failureDestination.putAll(priorityQueue::packageForRebalance, partitioner);
    +                        } else {
    +                            failureDestination.putAll(successfulFlowFiles, 
partitioner);
    +                        }
    +
    +                        return;
    +                    }
    +                }
    +
    +                // If unable to even connect to the node, go ahead and 
transfer all FlowFiles for this queue to the failure destination.
    +                // Otherwise, transfer just those FlowFiles that we failed 
to send.
    +                if (phase == TransactionPhase.CONNECTING) {
    +                    
failureDestination.putAll(priorityQueue::packageForRebalance, partitioner);
    +                } else {
    +                    failureDestination.putAll(flowFiles, partitioner);
    +                }
    +            }
    +        };
    +
    +        final TransactionCompleteCallback successCallback = new 
TransactionCompleteCallback() {
    +            @Override
    +            public void onTransactionComplete(final List<FlowFileRecord> 
flowFilesSent) {
    +                // We've now completed the transaction. We must now update 
the repositories and "keep the books", acknowledging the FlowFiles
    +                // with the queue so that its size remains accurate.
    +                updateRepositories(flowFilesSent, Collections.emptyList());
    +                priorityQueue.acknowledge(flowFilesSent);
    --- End diff --
    
    After looking at it more, yes, we should acknowledge in the failure 
callback as well. Thanks!


> Allow data in a Connection to be Load-Balanced across cluster
> -------------------------------------------------------------
>
>                 Key: NIFI-5516
>                 URL: https://issues.apache.org/jira/browse/NIFI-5516
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> Allow user to configure a Connection to be load balanced across the cluster. 
> For more information, see Feature Proposal at 
> https://cwiki.apache.org/confluence/display/NIFI/Load-Balanced+Connections



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to