Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2947#discussion_r220975068 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java --- @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractFlowFileQueue implements FlowFileQueue { + private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class); + private final String identifier; + private final FlowFileRepository flowFileRepository; + private final ProvenanceEventRepository provRepository; + private final ResourceClaimManager resourceClaimManager; + private final ProcessScheduler scheduler; + + private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L)); + private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000)); + + private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); + + private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE; + private String partitioningAttribute = null; + + private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS; + + + public AbstractFlowFileQueue(final String identifier, final ProcessScheduler scheduler, + final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, final ResourceClaimManager resourceClaimManager) { + this.identifier = identifier; + this.scheduler = scheduler; + this.flowFileRepository = flowFileRepo; + this.provRepository = provRepo; + this.resourceClaimManager = resourceClaimManager; + } + + @Override + public String getIdentifier() { + return identifier; + } + + protected ProcessScheduler getScheduler() { + return scheduler; + } + + @Override + public String getFlowFileExpiration() { + return expirationPeriod.get().getPeriod(); + } + + @Override + public int getFlowFileExpiration(final TimeUnit timeUnit) { + return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void setFlowFileExpiration(final String flowExpirationPeriod) { + final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS); + if (millis < 0) { + throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); + } + + expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis)); + } + + @Override + public void setBackPressureObjectThreshold(final long threshold) { + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = getMaxQueueSize(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); + } + } + + @Override + public long getBackPressureObjectThreshold() { + return getMaxQueueSize().getMaxCount(); + } + + @Override + public void setBackPressureDataSizeThreshold(final String maxDataSize) { + final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); + + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = getMaxQueueSize(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount()); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); + } + } + + @Override + public String getBackPressureDataSizeThreshold() { + return getMaxQueueSize().getMaxSize(); + } + + private MaxQueueSize getMaxQueueSize() { + return maxQueueSize.get(); + } + + @Override + public boolean isFull() { + final MaxQueueSize maxSize = getMaxQueueSize(); + + // Check if max size is set + if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) { + return false; + } + + final QueueSize queueSize = size(); + if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { + return true; + } + + if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) { + return true; + } + + return false; + } + + + @Override + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) { + // purge any old requests from the map just to keep it clean. But if there are very few requests, which is usually the case, then don't bother + if (listRequestMap.size() > 10) { + final List<String> toDrop = new ArrayList<>(); + for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) { + final ListFlowFileRequest request = entry.getValue(); + final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE; + + if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) { + toDrop.add(entry.getKey()); + } + } + + for (final String requestId : toDrop) { + listRequestMap.remove(requestId); + } + } + + // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue. + final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, maxResults, size()); + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + int position = 0; + int resultCount = 0; + final List<FlowFileSummary> summaries = new ArrayList<>(); + + // Create an ArrayList that contains all of the contents of the active queue. + // We do this so that we don't have to hold the lock any longer than absolutely necessary. + // We cannot simply pull the first 'maxResults' records from the queue, however, because the + // Iterator provided by PriorityQueue does not return records in order. So we would have to either + // use a writeLock and 'pop' the first 'maxResults' records off the queue or use a read lock and + // do a shallow copy of the queue. The shallow copy is generally quicker because it doesn't have to do + // the sorting to put the records back. So even though this has an expensive of Java Heap to create the + // extra collection, we are making this trade-off to avoid locking the queue any longer than required. + final List<FlowFileRecord> allFlowFiles = getListableFlowFiles(); + final QueuePrioritizer prioritizer = new QueuePrioritizer(getPriorities()); + + listRequest.setState(ListFlowFileState.CALCULATING_LIST); + + // sort the FlowFileRecords so that we have the list in the same order as on the queue. + allFlowFiles.sort(prioritizer); + + for (final FlowFileRecord flowFile : allFlowFiles) { + summaries.add(summarize(flowFile, ++position)); + if (summaries.size() >= maxResults) { + break; + } + } + + logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", this, resultCount); + listRequest.setFlowFileSummaries(summaries); + listRequest.setState(ListFlowFileState.COMPLETE); + } + }, "List FlowFiles for Connection " + getIdentifier()); + t.setDaemon(true); + t.start(); + + listRequestMap.put(requestIdentifier, listRequest); + return listRequest; + } + + @Override + public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) { + return listRequestMap.get(requestIdentifier); + } + + @Override + public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) { + logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier); + final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier); + if (request != null) { + request.cancel(); + } + + return request; + } + + /** + * @return all FlowFiles that should be listed in response to a List Queue request + */ + protected abstract List<FlowFileRecord> getListableFlowFiles(); + + + @Override + public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) { + logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier); + + // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother + if (dropRequestMap.size() > 10) { + final List<String> toDrop = new ArrayList<>(); + for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) { + final DropFlowFileRequest request = entry.getValue(); + final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE; + + if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) { + toDrop.add(entry.getKey()); + } + } + + for (final String requestId : toDrop) { + dropRequestMap.remove(requestId); + } + } + + final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier); + final QueueSize originalSize = size(); + dropRequest.setCurrentSize(originalSize); + dropRequest.setOriginalSize(originalSize); + if (originalSize.getObjectCount() == 0) { + dropRequest.setDroppedSize(originalSize); + dropRequest.setState(DropFlowFileState.COMPLETE); + dropRequestMap.put(requestIdentifier, dropRequest); + return dropRequest; + } + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + dropFlowFiles(dropRequest, requestor); + } + }, "Drop FlowFiles for Connection " + getIdentifier()); + t.setDaemon(true); + t.start(); + + dropRequestMap.put(requestIdentifier, dropRequest); + + return dropRequest; + } + + + @Override + public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) { + final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier); + if (request == null) { + return null; + } + + request.cancel(); + return request; + } + + @Override + public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) { + return dropRequestMap.get(requestIdentifier); + } + + /** + * Synchronously drops all FlowFiles in the queue + * + * @param dropRequest the request + * @param requestor the identity of the user/agent who made the request + */ + protected abstract void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor); + + @Override + public void verifyCanList() throws IllegalStateException { + } + + + protected FlowFileSummary summarize(final FlowFile flowFile, final int position) { + // extract all of the information that we care about into new variables rather than just + // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to + // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object, + // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well, + // which can be problematic if we expect them to be swapped out. + final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final long size = flowFile.getSize(); + final Long lastQueuedTime = flowFile.getLastQueueDate(); + final long lineageStart = flowFile.getLineageStartDate(); + final boolean penalized = flowFile.isPenalized(); + + return new FlowFileSummary() { + @Override + public String getUuid() { + return uuid; + } + + @Override + public String getFilename() { + return filename; + } + + @Override + public int getPosition() { + return position; + } + + @Override + public long getSize() { + return size; + } + + @Override + public long getLastQueuedTime() { + return lastQueuedTime == null ? 0L : lastQueuedTime; + } + + @Override + public long getLineageStartDate() { + return lineageStart; + } + + @Override + public boolean isPenalized() { + return penalized; + } + }; + } + + protected QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException { + // Create a Provenance Event and a FlowFile Repository record for each FlowFile + final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size()); + final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size()); + for (final FlowFileRecord flowFile : flowFiles) { + provenanceEvents.add(createDropProvenanceEvent(flowFile, requestor)); + flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile)); + } + + long dropContentSize = 0L; + for (final FlowFileRecord flowFile : flowFiles) { + dropContentSize += flowFile.getSize(); + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim == null) { + continue; + } + + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + if (resourceClaim == null) { + continue; + } + + resourceClaimManager.decrementClaimantCount(resourceClaim); + } + + provRepository.registerEvents(provenanceEvents); + flowFileRepository.updateRepository(flowFileRepoRecords); + return new QueueSize(flowFiles.size(), dropContentSize); + } + + private ProvenanceEventRecord createDropProvenanceEvent(final FlowFileRecord flowFile, final String requestor) { + final ProvenanceEventBuilder builder = provRepository.eventBuilder(); + builder.fromFlowFile(flowFile); + builder.setEventType(ProvenanceEventType.DROP); + builder.setLineageStartDate(flowFile.getLineageStartDate()); + builder.setComponentId(getIdentifier()); + builder.setComponentType("Connection"); + builder.setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap()); + builder.setDetails("FlowFile Queue emptied by " + requestor); + builder.setSourceQueueIdentifier(getIdentifier()); + + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize()); + } + + return builder.build(); + } + + private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) { + return new DropFlowFileRepositoryRecord(this, flowFile); + } + + @Override + public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) { + if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && partitioningAttribute == null) { --- End diff -- I'm ok with the ` && !partitioningAttribute.isEmpty()` - but really if partitioning attribute is null or empty and strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE, we probably should throw an Exception instead. Re: nullifying this.partitioningAttribute and reseting compression - I disagree. If a user is using Partition By Attribute, then changes it to Round Robin on accident, for instance, and wants to change it back, we should keep the Partitioning Attribute. Or said another way, we should not assume that the user wants the field to be cleared. Similarly, if the user is load balancing, then decides to turn off load balancing (for example to see a difference in performance) we should not reset their configuration. If they want to set it back, they would like (at least I would) expect the previously configured compression value to remain.
---