Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2947#discussion_r220440614 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.queue.DropFlowFileRequest; +import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.SwappablePriorityQueue; +import org.apache.nifi.controller.queue.clustered.TransferFailureDestination; +import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; +import org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback; +import org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback; +import org.apache.nifi.controller.repository.ContentNotFoundException; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.StandardRepositoryRecord; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * A Queue Partition that is responsible for transferring FlowFiles to another node in the cluster + */ +public class RemoteQueuePartition implements QueuePartition { + private static final Logger logger = LoggerFactory.getLogger(RemoteQueuePartition.class); + + private final NodeIdentifier nodeIdentifier; + private final SwappablePriorityQueue priorityQueue; + private final LoadBalancedFlowFileQueue flowFileQueue; + private final TransferFailureDestination failureDestination; + + private final FlowFileRepository flowFileRepo; + private final ProvenanceEventRepository provRepo; + private final ContentRepository contentRepo; + private final AsyncLoadBalanceClientRegistry clientRegistry; + + private boolean running = false; + private final String description; + + public RemoteQueuePartition(final NodeIdentifier nodeId, final SwappablePriorityQueue priorityQueue, final TransferFailureDestination failureDestination, + final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, final ContentRepository contentRepository, + final AsyncLoadBalanceClientRegistry clientRegistry, final LoadBalancedFlowFileQueue flowFileQueue) { + + this.nodeIdentifier = nodeId; + this.priorityQueue = priorityQueue; + this.flowFileQueue = flowFileQueue; + this.failureDestination = failureDestination; + this.flowFileRepo = flowFileRepo; + this.provRepo = provRepo; + this.contentRepo = contentRepository; + this.clientRegistry = clientRegistry; + this.description = "RemoteQueuePartition[queueId=" + flowFileQueue.getIdentifier() + ", nodeId=" + nodeIdentifier + "]"; + } + + @Override + public QueueSize size() { + return priorityQueue.size(); + } + + @Override + public String getSwapPartitionName() { + return nodeIdentifier.getId(); + } + + @Override + public Optional<NodeIdentifier> getNodeIdentifier() { + return Optional.ofNullable(nodeIdentifier); + } + + @Override + public void put(final FlowFileRecord flowFile) { + priorityQueue.put(flowFile); + } + + @Override + public void putAll(final Collection<FlowFileRecord> flowFiles) { + priorityQueue.putAll(flowFiles); + } + + @Override + public void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) { + priorityQueue.dropFlowFiles(dropRequest, requestor); + } + + @Override + public SwapSummary recoverSwappedFlowFiles() { + return priorityQueue.recoverSwappedFlowFiles(); + } + + @Override + public FlowFileQueueContents packageForRebalance(String newPartitionName) { + return priorityQueue.packageForRebalance(newPartitionName); + } + + @Override + public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { + priorityQueue.setPriorities(newPriorities); + } + + private FlowFileRecord getFlowFile() { + final Set<FlowFileRecord> expired = new HashSet<>(); + final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)); + flowFileQueue.handleExpiredRecords(expired); + return flowFile; + } + + @Override + public synchronized void start(final FlowFilePartitioner partitioner) { + if (running) { + return; + } + + final TransactionFailureCallback failureCallback = new TransactionFailureCallback() { + @Override + public void onTransactionFailed(final List<FlowFileRecord> flowFiles, final Exception cause, final TransactionPhase phase) { + if (cause instanceof ContentNotFoundException) { + // Handle ContentNotFound by creating a RepositoryRecord for the FlowFile and marking as aborted, then updating the + // FlowFiles and Provenance Repositories accordingly. This follows the same pattern as StandardProcessSession so that + // we have a consistent way of handling this case. + final Optional<FlowFileRecord> optionalFlowFile = ((ContentNotFoundException) cause).getFlowFile(); + if (optionalFlowFile.isPresent()) { + final List<FlowFileRecord> successfulFlowFiles = new ArrayList<>(flowFiles); + + final FlowFileRecord flowFile = optionalFlowFile.get(); + successfulFlowFiles.remove(flowFile); + + final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(flowFileQueue, flowFile); + repoRecord.markForAbort(); + + updateRepositories(Collections.emptyList(), Collections.singleton(repoRecord)); + + // If unable to even connect to the node, go ahead and transfer all FlowFiles for this queue to the failure destination. + // Otherwise, transfer just those FlowFiles that we failed to send. + if (phase == TransactionPhase.CONNECTING) { + failureDestination.putAll(priorityQueue::packageForRebalance, partitioner); + } else { + failureDestination.putAll(successfulFlowFiles, partitioner); + } + + return; + } + } + + // If unable to even connect to the node, go ahead and transfer all FlowFiles for this queue to the failure destination. + // Otherwise, transfer just those FlowFiles that we failed to send. + if (phase == TransactionPhase.CONNECTING) { + failureDestination.putAll(priorityQueue::packageForRebalance, partitioner); + } else { + failureDestination.putAll(flowFiles, partitioner); + } + } + }; + + final TransactionCompleteCallback successCallback = new TransactionCompleteCallback() { + @Override + public void onTransactionComplete(final List<FlowFileRecord> flowFilesSent) { + // We've now completed the transaction. We must now update the repositories and "keep the books", acknowledging the FlowFiles + // with the queue so that its size remains accurate. + updateRepositories(flowFilesSent, Collections.emptyList()); + priorityQueue.acknowledge(flowFilesSent); --- End diff -- This `priorityQueue` emptiness is used to determine if there's no more FlowFiles at the `priorityQueue::isEmpty` below. After I encountered transfer failure, this priorityQueue never become empty as acknowledge method is only called here at `onTransactionComplete`. That resulted logging the following message continuously which means empty FlowFile lists are being sent continuously where not necessary, because NiFi thinks there're more FlowFiles to load-balance. `priorityQueue::isEmpty` returns false: ``` o.a.n.c.q.c.SocketLoadBalancedFlowFileQueue Received the following FlowFiles from Peer: []. Will accept FlowFiles to the local partition ``` Shouldn't we call acknowledge from `failureCallback`, too?
---