Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2947#discussion_r219366959
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
 ---
    @@ -0,0 +1,460 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.queue;
    +
    +import org.apache.nifi.controller.ProcessScheduler;
    +import org.apache.nifi.controller.repository.FlowFileRecord;
    +import org.apache.nifi.controller.repository.FlowFileRepository;
    +import org.apache.nifi.controller.repository.RepositoryRecord;
    +import org.apache.nifi.controller.repository.claim.ContentClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.provenance.ProvenanceEventBuilder;
    +import org.apache.nifi.provenance.ProvenanceEventRecord;
    +import org.apache.nifi.provenance.ProvenanceEventRepository;
    +import org.apache.nifi.provenance.ProvenanceEventType;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +public abstract class AbstractFlowFileQueue implements FlowFileQueue {
    +    private static final Logger logger = 
LoggerFactory.getLogger(AbstractFlowFileQueue.class);
    +    private final String identifier;
    +    private final FlowFileRepository flowFileRepository;
    +    private final ProvenanceEventRepository provRepository;
    +    private final ResourceClaimManager resourceClaimManager;
    +    private final ProcessScheduler scheduler;
    +
    +    private final AtomicReference<TimePeriod> expirationPeriod = new 
AtomicReference<>(new TimePeriod("0 mins", 0L));
    +    private final AtomicReference<MaxQueueSize> maxQueueSize = new 
AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
    +
    +    private final ConcurrentMap<String, ListFlowFileRequest> 
listRequestMap = new ConcurrentHashMap<>();
    +    private final ConcurrentMap<String, DropFlowFileRequest> 
dropRequestMap = new ConcurrentHashMap<>();
    +
    +    private LoadBalanceStrategy loadBalanceStrategy = 
LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
    +    private String partitioningAttribute = null;
    +
    +    private LoadBalanceCompression compression = 
LoadBalanceCompression.DO_NOT_COMPRESS;
    +
    +
    +    public AbstractFlowFileQueue(final String identifier, final 
ProcessScheduler scheduler,
    +            final FlowFileRepository flowFileRepo, final 
ProvenanceEventRepository provRepo, final ResourceClaimManager 
resourceClaimManager) {
    +        this.identifier = identifier;
    +        this.scheduler = scheduler;
    +        this.flowFileRepository = flowFileRepo;
    +        this.provRepository = provRepo;
    +        this.resourceClaimManager = resourceClaimManager;
    +    }
    +
    +    @Override
    +    public String getIdentifier() {
    +        return identifier;
    +    }
    +
    +    protected ProcessScheduler getScheduler() {
    +        return scheduler;
    +    }
    +
    +    @Override
    +    public String getFlowFileExpiration() {
    +        return expirationPeriod.get().getPeriod();
    +    }
    +
    +    @Override
    +    public int getFlowFileExpiration(final TimeUnit timeUnit) {
    +        return (int) timeUnit.convert(expirationPeriod.get().getMillis(), 
TimeUnit.MILLISECONDS);
    +    }
    +
    +    @Override
    +    public void setFlowFileExpiration(final String flowExpirationPeriod) {
    +        final long millis = 
FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
    +        if (millis < 0) {
    +            throw new IllegalArgumentException("FlowFile Expiration Period 
must be positive");
    +        }
    +
    +        expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
    +    }
    +
    +    @Override
    +    public void setBackPressureObjectThreshold(final long threshold) {
    +        boolean updated = false;
    +        while (!updated) {
    +            MaxQueueSize maxSize = getMaxQueueSize();
    +            final MaxQueueSize updatedSize = new 
MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
    +            updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
    +        }
    +    }
    +
    +    @Override
    +    public long getBackPressureObjectThreshold() {
    +        return getMaxQueueSize().getMaxCount();
    +    }
    +
    +    @Override
    +    public void setBackPressureDataSizeThreshold(final String maxDataSize) 
{
    +        final long maxBytes = DataUnit.parseDataSize(maxDataSize, 
DataUnit.B).longValue();
    +
    +        boolean updated = false;
    +        while (!updated) {
    +            MaxQueueSize maxSize = getMaxQueueSize();
    +            final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, 
maxBytes, maxSize.getMaxCount());
    +            updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
    +        }
    +    }
    +
    +    @Override
    +    public String getBackPressureDataSizeThreshold() {
    +        return getMaxQueueSize().getMaxSize();
    +    }
    +
    +    private MaxQueueSize getMaxQueueSize() {
    +        return maxQueueSize.get();
    +    }
    +
    +    @Override
    +    public boolean isFull() {
    +        final MaxQueueSize maxSize = getMaxQueueSize();
    +
    +        // Check if max size is set
    +        if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
    +            return false;
    +        }
    +
    +        final QueueSize queueSize = size();
    +        if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= 
maxSize.getMaxCount()) {
    +            return true;
    +        }
    +
    +        if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= 
maxSize.getMaxBytes()) {
    +            return true;
    +        }
    +
    +        return false;
    +    }
    +
    +
    +    @Override
    +    public ListFlowFileStatus listFlowFiles(final String 
requestIdentifier, final int maxResults) {
    +        // purge any old requests from the map just to keep it clean. But 
if there are very few requests, which is usually the case, then don't bother
    +        if (listRequestMap.size() > 10) {
    +            final List<String> toDrop = new ArrayList<>();
    +            for (final Map.Entry<String, ListFlowFileRequest> entry : 
listRequestMap.entrySet()) {
    +                final ListFlowFileRequest request = entry.getValue();
    +                final boolean completed = request.getState() == 
ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
    +
    +                if (completed && System.currentTimeMillis() - 
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
    +                    toDrop.add(entry.getKey());
    +                }
    +            }
    +
    +            for (final String requestId : toDrop) {
    +                listRequestMap.remove(requestId);
    +            }
    +        }
    +
    +        // numSteps = 1 for each swap location + 1 for active queue + 1 
for swap queue.
    +        final ListFlowFileRequest listRequest = new 
ListFlowFileRequest(requestIdentifier, maxResults, size());
    +
    +        final Thread t = new Thread(new Runnable() {
    +            @Override
    +            public void run() {
    +                int position = 0;
    +                int resultCount = 0;
    +                final List<FlowFileSummary> summaries = new ArrayList<>();
    +
    +                // Create an ArrayList that contains all of the contents 
of the active queue.
    +                // We do this so that we don't have to hold the lock any 
longer than absolutely necessary.
    +                // We cannot simply pull the first 'maxResults' records 
from the queue, however, because the
    +                // Iterator provided by PriorityQueue does not return 
records in order. So we would have to either
    +                // use a writeLock and 'pop' the first 'maxResults' 
records off the queue or use a read lock and
    +                // do a shallow copy of the queue. The shallow copy is 
generally quicker because it doesn't have to do
    +                // the sorting to put the records back. So even though 
this has an expensive of Java Heap to create the
    +                // extra collection, we are making this trade-off to avoid 
locking the queue any longer than required.
    +                final List<FlowFileRecord> allFlowFiles = 
getListableFlowFiles();
    +                final QueuePrioritizer prioritizer = new 
QueuePrioritizer(getPriorities());
    +
    +                listRequest.setState(ListFlowFileState.CALCULATING_LIST);
    +
    +                // sort the FlowFileRecords so that we have the list in 
the same order as on the queue.
    +                allFlowFiles.sort(prioritizer);
    +
    +                for (final FlowFileRecord flowFile : allFlowFiles) {
    +                    summaries.add(summarize(flowFile, ++position));
    +                    if (summaries.size() >= maxResults) {
    +                        break;
    +                    }
    +                }
    +
    +                logger.debug("{} Finished listing FlowFiles for active 
queue with a total of {} results", this, resultCount);
    +                listRequest.setFlowFileSummaries(summaries);
    +                listRequest.setState(ListFlowFileState.COMPLETE);
    +            }
    +        }, "List FlowFiles for Connection " + getIdentifier());
    +        t.setDaemon(true);
    +        t.start();
    +
    +        listRequestMap.put(requestIdentifier, listRequest);
    +        return listRequest;
    +    }
    +
    +    @Override
    +    public ListFlowFileStatus getListFlowFileStatus(final String 
requestIdentifier) {
    +        return listRequestMap.get(requestIdentifier);
    +    }
    +
    +    @Override
    +    public ListFlowFileStatus cancelListFlowFileRequest(final String 
requestIdentifier) {
    +        logger.info("Canceling ListFlowFile Request with ID {}", 
requestIdentifier);
    +        final ListFlowFileRequest request = 
listRequestMap.remove(requestIdentifier);
    +        if (request != null) {
    +            request.cancel();
    +        }
    +
    +        return request;
    +    }
    +
    +    /**
    +     * @return all FlowFiles that should be listed in response to a List 
Queue request
    +     */
    +    protected abstract List<FlowFileRecord> getListableFlowFiles();
    +
    +
    +    @Override
    +    public DropFlowFileStatus dropFlowFiles(final String 
requestIdentifier, final String requestor) {
    +        logger.info("Initiating drop of FlowFiles from {} on behalf of {} 
(request identifier={})", this, requestor, requestIdentifier);
    +
    +        // purge any old requests from the map just to keep it clean. But 
if there are very requests, which is usually the case, then don't bother
    +        if (dropRequestMap.size() > 10) {
    +            final List<String> toDrop = new ArrayList<>();
    +            for (final Map.Entry<String, DropFlowFileRequest> entry : 
dropRequestMap.entrySet()) {
    +                final DropFlowFileRequest request = entry.getValue();
    +                final boolean completed = request.getState() == 
DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
    +
    +                if (completed && System.currentTimeMillis() - 
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
    +                    toDrop.add(entry.getKey());
    +                }
    +            }
    +
    +            for (final String requestId : toDrop) {
    +                dropRequestMap.remove(requestId);
    +            }
    +        }
    +
    +        final DropFlowFileRequest dropRequest = new 
DropFlowFileRequest(requestIdentifier);
    +        final QueueSize originalSize = size();
    +        dropRequest.setCurrentSize(originalSize);
    +        dropRequest.setOriginalSize(originalSize);
    +        if (originalSize.getObjectCount() == 0) {
    +            dropRequest.setDroppedSize(originalSize);
    +            dropRequest.setState(DropFlowFileState.COMPLETE);
    +            dropRequestMap.put(requestIdentifier, dropRequest);
    +            return dropRequest;
    +        }
    +
    +        final Thread t = new Thread(new Runnable() {
    +            @Override
    +            public void run() {
    +                dropFlowFiles(dropRequest, requestor);
    +            }
    +        }, "Drop FlowFiles for Connection " + getIdentifier());
    +        t.setDaemon(true);
    +        t.start();
    +
    +        dropRequestMap.put(requestIdentifier, dropRequest);
    +
    +        return dropRequest;
    +    }
    +
    +
    +    @Override
    +    public DropFlowFileRequest cancelDropFlowFileRequest(final String 
requestIdentifier) {
    +        final DropFlowFileRequest request = 
dropRequestMap.remove(requestIdentifier);
    +        if (request == null) {
    +            return null;
    +        }
    +
    +        request.cancel();
    +        return request;
    +    }
    +
    +    @Override
    +    public DropFlowFileStatus getDropFlowFileStatus(final String 
requestIdentifier) {
    +        return dropRequestMap.get(requestIdentifier);
    +    }
    +
    +    /**
    +     * Synchronously drops all FlowFiles in the queue
    +     *
    +     * @param dropRequest the request
    +     * @param requestor the identity of the user/agent who made the request
    +     */
    +    protected abstract void dropFlowFiles(final DropFlowFileRequest 
dropRequest, final String requestor);
    +
    +    @Override
    +    public void verifyCanList() throws IllegalStateException {
    +    }
    +
    +
    +    protected FlowFileSummary summarize(final FlowFile flowFile, final int 
position) {
    +        // extract all of the information that we care about into new 
variables rather than just
    +        // wrapping the FlowFile object with a FlowFileSummary object. We 
do this because we want to
    +        // be able to hold many FlowFileSummary objects in memory and if 
we just wrap the FlowFile object,
    +        // we will end up holding the entire FlowFile (including all 
Attributes) in the Java heap as well,
    +        // which can be problematic if we expect them to be swapped out.
    +        final String uuid = 
flowFile.getAttribute(CoreAttributes.UUID.key());
    +        final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
    +        final long size = flowFile.getSize();
    +        final Long lastQueuedTime = flowFile.getLastQueueDate();
    +        final long lineageStart = flowFile.getLineageStartDate();
    +        final boolean penalized = flowFile.isPenalized();
    +
    +        return new FlowFileSummary() {
    +            @Override
    +            public String getUuid() {
    +                return uuid;
    +            }
    +
    +            @Override
    +            public String getFilename() {
    +                return filename;
    +            }
    +
    +            @Override
    +            public int getPosition() {
    +                return position;
    +            }
    +
    +            @Override
    +            public long getSize() {
    +                return size;
    +            }
    +
    +            @Override
    +            public long getLastQueuedTime() {
    +                return lastQueuedTime == null ? 0L : lastQueuedTime;
    +            }
    +
    +            @Override
    +            public long getLineageStartDate() {
    +                return lineageStart;
    +            }
    +
    +            @Override
    +            public boolean isPenalized() {
    +                return penalized;
    +            }
    +        };
    +    }
    +
    +    protected QueueSize drop(final List<FlowFileRecord> flowFiles, final 
String requestor) throws IOException {
    +        // Create a Provenance Event and a FlowFile Repository record for 
each FlowFile
    +        final List<ProvenanceEventRecord> provenanceEvents = new 
ArrayList<>(flowFiles.size());
    +        final List<RepositoryRecord> flowFileRepoRecords = new 
ArrayList<>(flowFiles.size());
    +        for (final FlowFileRecord flowFile : flowFiles) {
    +            provenanceEvents.add(createDropProvenanceEvent(flowFile, 
requestor));
    +            
flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
    +        }
    +
    +        long dropContentSize = 0L;
    +        for (final FlowFileRecord flowFile : flowFiles) {
    +            dropContentSize += flowFile.getSize();
    +            final ContentClaim contentClaim = flowFile.getContentClaim();
    +            if (contentClaim == null) {
    +                continue;
    +            }
    +
    +            final ResourceClaim resourceClaim = 
contentClaim.getResourceClaim();
    +            if (resourceClaim == null) {
    +                continue;
    +            }
    +
    +            resourceClaimManager.decrementClaimantCount(resourceClaim);
    +        }
    +
    +        provRepository.registerEvents(provenanceEvents);
    +        flowFileRepository.updateRepository(flowFileRepoRecords);
    +        return new QueueSize(flowFiles.size(), dropContentSize);
    +    }
    +
    +    private ProvenanceEventRecord createDropProvenanceEvent(final 
FlowFileRecord flowFile, final String requestor) {
    +        final ProvenanceEventBuilder builder = 
provRepository.eventBuilder();
    +        builder.fromFlowFile(flowFile);
    +        builder.setEventType(ProvenanceEventType.DROP);
    +        builder.setLineageStartDate(flowFile.getLineageStartDate());
    +        builder.setComponentId(getIdentifier());
    +        builder.setComponentType("Connection");
    +        builder.setAttributes(flowFile.getAttributes(), 
Collections.<String, String> emptyMap());
    +        builder.setDetails("FlowFile Queue emptied by " + requestor);
    +        builder.setSourceQueueIdentifier(getIdentifier());
    +
    +        final ContentClaim contentClaim = flowFile.getContentClaim();
    +        if (contentClaim != null) {
    +            final ResourceClaim resourceClaim = 
contentClaim.getResourceClaim();
    +            builder.setPreviousContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), 
flowFile.getSize());
    +        }
    +
    +        return builder.build();
    +    }
    +
    +    private RepositoryRecord createDeleteRepositoryRecord(final 
FlowFileRecord flowFile) {
    +        return new DropFlowFileRepositoryRecord(this, flowFile);
    +    }
    +
    +    @Override
    +    public synchronized void setLoadBalanceStrategy(final 
LoadBalanceStrategy strategy, final String partitioningAttribute) {
    +        if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && 
partitioningAttribute == null) {
    --- End diff --
    
    To make the instance variables consistent,
    - We should add `&& !partitioningAttribute.isEmptry()` as well
    - Also we should nullify this.partitioningAttribute if strategy is not 
PARTITION_BY_ATTRIBUTE
    - Also we should reset this.compression to DO_NOT_COMPRESS if strategy is 
DO_NOT_LOAD_BALANCE


---

Reply via email to