http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java new file mode 100644 index 0000000..3b880bb --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -0,0 +1,1093 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; +import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.concurrency.TimedLock; +import org.apache.nifi.util.timebuffer.LongEntityAccess; +import org.apache.nifi.util.timebuffer.TimedBuffer; +import org.apache.nifi.util.timebuffer.TimestampedLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A FlowFileQueue is used to queue FlowFile objects that are awaiting further + * processing. Must be thread safe. + * + * @author none + */ +public final class StandardFlowFileQueue implements FlowFileQueue { + + public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000; + public static final int SWAP_RECORD_POLL_SIZE = 10000; + + // When we have very high contention on a FlowFile Queue, the writeLock quickly becomes the bottleneck. In order to avoid this, + // we keep track of how often we are obtaining the write lock. If we exceed some threshold, we start performing a Pre-fetch so that + // we can then poll many times without having to obtain the lock. + // If lock obtained an average of more than PREFETCH_POLL_THRESHOLD times per second in order to poll from queue for last 5 seconds, do a pre-fetch. + public static final int PREFETCH_POLL_THRESHOLD = 1000; + public static final int PRIORITIZED_PREFETCH_SIZE = 10; + public static final int UNPRIORITIZED_PREFETCH_SIZE = 1000; + private volatile int prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; // when we pre-fetch, how many should we pre-fetch? + + private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); + + private PriorityQueue<FlowFileRecord> activeQueue = null; + private long activeQueueContentSize = 0L; + private ArrayList<FlowFileRecord> swapQueue = null; + + private int swappedRecordCount = 0; + private long swappedContentSize = 0L; + private String maximumQueueDataSize; + private long maximumQueueByteCount; + private boolean swapMode = false; + private long maximumQueueObjectCount; + + private final AtomicLong flowFileExpirationMillis; + private final Connection connection; + private final AtomicReference<String> flowFileExpirationPeriod; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final List<FlowFilePrioritizer> priorities; + private final int swapThreshold; + private final TimedLock readLock; + private final TimedLock writeLock; + private final String identifier; + + private final AtomicBoolean queueFullRef = new AtomicBoolean(false); + private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0); + private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L)); + + // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! + private final ProcessScheduler scheduler; + + public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final int swapThreshold) { + activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); + priorities = new ArrayList<>(); + maximumQueueObjectCount = 0L; + maximumQueueDataSize = "0 MB"; + maximumQueueByteCount = 0L; + flowFileExpirationMillis = new AtomicLong(0); + flowFileExpirationPeriod = new AtomicReference<>("0 mins"); + swapQueue = new ArrayList<>(); + + this.identifier = identifier; + this.swapThreshold = swapThreshold; + this.scheduler = scheduler; + this.connection = connection; + + readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100); + writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public List<FlowFilePrioritizer> getPriorities() { + return Collections.unmodifiableList(priorities); + } + + @Override + public int getSwapThreshold() { + return swapThreshold; + } + + @Override + public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { + writeLock.lock(); + try { + final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new Prioritizer(newPriorities)); + newQueue.addAll(activeQueue); + activeQueue = newQueue; + priorities.clear(); + priorities.addAll(newPriorities); + + if (newPriorities.isEmpty()) { + prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; + } else { + prefetchSize = PRIORITIZED_PREFETCH_SIZE; + } + } finally { + writeLock.unlock("setPriorities"); + } + } + + @Override + public void setBackPressureObjectThreshold(final long maxQueueSize) { + writeLock.lock(); + try { + maximumQueueObjectCount = maxQueueSize; + this.queueFullRef.set(determineIfFull()); + } finally { + writeLock.unlock("setBackPressureObjectThreshold"); + } + } + + @Override + public long getBackPressureObjectThreshold() { + readLock.lock(); + try { + return maximumQueueObjectCount; + } finally { + readLock.unlock("getBackPressureObjectThreshold"); + } + } + + @Override + public void setBackPressureDataSizeThreshold(final String maxDataSize) { + writeLock.lock(); + try { + maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); + maximumQueueDataSize = maxDataSize; + this.queueFullRef.set(determineIfFull()); + } finally { + writeLock.unlock("setBackPressureDataSizeThreshold"); + } + } + + @Override + public String getBackPressureDataSizeThreshold() { + readLock.lock(); + try { + return maximumQueueDataSize; + } finally { + readLock.unlock("getBackPressureDataSizeThreshold"); + } + } + + @Override + public QueueSize size() { + readLock.lock(); + try { + return getQueueSize(); + } finally { + readLock.unlock("getSize"); + } + } + + /** + * MUST be called with lock held + * + * @return + */ + private QueueSize getQueueSize() { + final QueueSize unacknowledged = unacknowledgedSizeRef.get(); + final PreFetch preFetch = preFetchRef.get(); + + final int preFetchCount; + final long preFetchSize; + if (preFetch == null) { + preFetchCount = 0; + preFetchSize = 0L; + } else { + final QueueSize preFetchQueueSize = preFetch.size(); + preFetchCount = preFetchQueueSize.getObjectCount(); + preFetchSize = preFetchQueueSize.getByteCount(); + } + + return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount, + activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); + } + + @Override + public long contentSize() { + readLock.lock(); + try { + final PreFetch prefetch = preFetchRef.get(); + if (prefetch == null) { + return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount(); + } else { + return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount() + prefetch.size().getByteCount(); + } + } finally { + readLock.unlock("getContentSize"); + } + } + + @Override + public boolean isEmpty() { + readLock.lock(); + try { + final PreFetch prefetch = preFetchRef.get(); + if (prefetch == null) { + return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0; + } else { + return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0 && prefetch.size().getObjectCount() == 0; + } + } finally { + readLock.unlock("isEmpty"); + } + } + + @Override + public boolean isActiveQueueEmpty() { + final int activeQueueSize = activeQueueSizeRef.get(); + if (activeQueueSize == 0) { + final PreFetch preFetch = preFetchRef.get(); + if (preFetch == null) { + return true; + } + + final QueueSize queueSize = preFetch.size(); + return queueSize.getObjectCount() == 0; + } else { + return false; + } + } + + @Override + public QueueSize getActiveQueueSize() { + readLock.lock(); + try { + final PreFetch preFetch = preFetchRef.get(); + if (preFetch == null) { + return new QueueSize(activeQueue.size(), activeQueueContentSize); + } else { + final QueueSize preFetchSize = preFetch.size(); + return new QueueSize(activeQueue.size() + preFetchSize.getObjectCount(), activeQueueContentSize + preFetchSize.getByteCount()); + } + } finally { + readLock.unlock("getActiveQueueSize"); + } + } + + @Override + public void acknowledge(final FlowFileRecord flowFile) { + if (queueFullRef.get()) { + writeLock.lock(); + try { + updateUnacknowledgedSize(-1, -flowFile.getSize()); + queueFullRef.set(determineIfFull()); + } finally { + writeLock.unlock("acknowledge(FlowFileRecord)"); + } + } else { + updateUnacknowledgedSize(-1, -flowFile.getSize()); + } + + if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { + // queue was full but no longer is. Notify that the source may now be available to run, + // because of back pressure caused by this queue. + scheduler.registerEvent(connection.getSource()); + } + } + + @Override + public void acknowledge(final Collection<FlowFileRecord> flowFiles) { + long totalSize = 0L; + for (final FlowFileRecord flowFile : flowFiles) { + totalSize += flowFile.getSize(); + } + + if (queueFullRef.get()) { + writeLock.lock(); + try { + updateUnacknowledgedSize(-flowFiles.size(), -totalSize); + queueFullRef.set(determineIfFull()); + } finally { + writeLock.unlock("acknowledge(FlowFileRecord)"); + } + } else { + updateUnacknowledgedSize(-flowFiles.size(), -totalSize); + } + + if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { + // it's possible that queue was full but no longer is. Notify that the source may now be available to run, + // because of back pressure caused by this queue. + scheduler.registerEvent(connection.getSource()); + } + } + + @Override + public boolean isFull() { + return queueFullRef.get(); + } + + /** + * MUST be called with either the read or write lock held + * + * @return + */ + private boolean determineIfFull() { + final long maxSize = maximumQueueObjectCount; + final long maxBytes = maximumQueueByteCount; + if (maxSize <= 0 && maxBytes <= 0) { + return false; + } + + final QueueSize queueSize = getQueueSize(); + if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) { + return true; + } + + if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) { + return true; + } + + return false; + } + + @Override + public void put(final FlowFileRecord file) { + writeLock.lock(); + try { + if (swapMode || activeQueue.size() >= swapThreshold) { + swapQueue.add(file); + swappedContentSize += file.getSize(); + swappedRecordCount++; + swapMode = true; + } else { + activeQueueContentSize += file.getSize(); + activeQueue.add(file); + } + + queueFullRef.set(determineIfFull()); + } finally { + activeQueueSizeRef.set(activeQueue.size()); + writeLock.unlock("put(FlowFileRecord)"); + } + + if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { + scheduler.registerEvent(connection.getDestination()); + } + } + + @Override + public void putAll(final Collection<FlowFileRecord> files) { + final int numFiles = files.size(); + long bytes = 0L; + for (final FlowFile flowFile : files) { + bytes += flowFile.getSize(); + } + + writeLock.lock(); + try { + if (swapMode || activeQueue.size() >= swapThreshold - numFiles) { + swapQueue.addAll(files); + swappedContentSize += bytes; + swappedRecordCount += numFiles; + swapMode = true; + } else { + activeQueueContentSize += bytes; + activeQueue.addAll(files); + } + + queueFullRef.set(determineIfFull()); + } finally { + activeQueueSizeRef.set(activeQueue.size()); + writeLock.unlock("putAll"); + } + + if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { + scheduler.registerEvent(connection.getDestination()); + } + } + + @Override + public List<FlowFileRecord> pollSwappableRecords() { + writeLock.lock(); + try { + if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { + return null; + } + + final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size())); + final Iterator<FlowFileRecord> itr = swapQueue.iterator(); + while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) { + FlowFileRecord record = itr.next(); + swapRecords.add(record); + itr.remove(); + } + + swapQueue.trimToSize(); + return swapRecords; + } finally { + writeLock.unlock("pollSwappableRecords"); + } + } + + @Override + public void putSwappedRecords(final Collection<FlowFileRecord> records) { + writeLock.lock(); + try { + try { + for (final FlowFileRecord record : records) { + swappedContentSize -= record.getSize(); + swappedRecordCount--; + activeQueueContentSize += record.getSize(); + activeQueue.add(record); + } + + if (swappedRecordCount > swapQueue.size()) { + // we have more swap files to be swapped in. + return; + } + + // If a call to #pollSwappableRecords will not produce any, go ahead and roll those FlowFiles back into the mix + if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { + for (final FlowFileRecord record : swapQueue) { + activeQueue.add(record); + activeQueueContentSize += record.getSize(); + } + swapQueue.clear(); + swappedContentSize = 0L; + swappedRecordCount = 0; + swapMode = false; + } + } finally { + activeQueueSizeRef.set(activeQueue.size()); + } + } finally { + writeLock.unlock("putSwappedRecords"); + scheduler.registerEvent(connection.getDestination()); + } + } + + @Override + public void incrementSwapCount(final int numRecords, final long contentSize) { + writeLock.lock(); + try { + swappedContentSize += contentSize; + swappedRecordCount += numRecords; + } finally { + writeLock.unlock("incrementSwapCount"); + } + } + + @Override + public int unswappedSize() { + readLock.lock(); + try { + return activeQueue.size() + unacknowledgedSizeRef.get().getObjectCount(); + } finally { + readLock.unlock("unswappedSize"); + } + } + + @Override + public int getSwapRecordCount() { + readLock.lock(); + try { + return swappedRecordCount; + } finally { + readLock.unlock("getSwapRecordCount"); + } + } + + @Override + public int getSwapQueueSize() { + readLock.lock(); + try { + if (logger.isDebugEnabled()) { + final long byteToMbDivisor = 1024L * 1024L; + final QueueSize unacknowledged = unacknowledgedSizeRef.get(); + + logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB", + activeQueue.size(), activeQueueContentSize / byteToMbDivisor, + swappedRecordCount, swappedContentSize / byteToMbDivisor, + unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); + } + + return swapQueue.size(); + } finally { + readLock.unlock("getSwapQueueSize"); + } + } + + private boolean isLaterThan(final Long maxAge) { + if (maxAge == null) { + return false; + } + return maxAge < System.currentTimeMillis(); + } + + private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) { + if (flowFile == null) { + return null; + } + if (expirationMillis <= 0) { + return null; + } else { + final long entryDate = flowFile.getEntryDate(); + final long expirationDate = entryDate + expirationMillis; + return expirationDate; + } + } + + @Override + public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) { + FlowFileRecord flowFile = null; + + // First check if we have any records Pre-Fetched. + final long expirationMillis = flowFileExpirationMillis.get(); + final PreFetch preFetch = preFetchRef.get(); + if (preFetch != null) { + if (preFetch.isExpired()) { + requeueExpiredPrefetch(preFetch); + } else { + while (true) { + final FlowFileRecord next = preFetch.nextRecord(); + if (next == null) { + break; + } + + if (isLaterThan(getExpirationDate(next, expirationMillis))) { + expiredRecords.add(next); + continue; + } + + updateUnacknowledgedSize(1, next.getSize()); + return next; + } + + preFetchRef.compareAndSet(preFetch, null); + } + } + + writeLock.lock(); + try { + flowFile = doPoll(expiredRecords, expirationMillis); + return flowFile; + } finally { + activeQueueSizeRef.set(activeQueue.size()); + writeLock.unlock("poll(Set)"); + + if (flowFile != null) { + updateUnacknowledgedSize(1, flowFile.getSize()); + } + } + } + + private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) { + FlowFileRecord flowFile; + boolean isExpired; + + migrateSwapToActive(); + boolean queueFullAtStart = queueFullRef.get(); + + do { + flowFile = this.activeQueue.poll(); + + isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); + if (isExpired) { + expiredRecords.add(flowFile); + if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { + activeQueueContentSize -= flowFile.getSize(); + break; + } + } else if (flowFile != null && flowFile.isPenalized()) { + this.activeQueue.add(flowFile); + flowFile = null; + break; + } + + if (flowFile != null) { + activeQueueContentSize -= flowFile.getSize(); + } + } while (isExpired); + + // if at least 1 FlowFile was expired & the queue was full before we started, then + // we need to determine whether or not the queue is full again. If no FlowFile was expired, + // then the queue will still be full until the appropriate #acknowledge method is called. + if (queueFullAtStart && !expiredRecords.isEmpty()) { + queueFullRef.set(determineIfFull()); + } + + if (incrementPollCount()) { + prefetch(); + } + return isExpired ? null : flowFile; + } + + @Override + public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) { + final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults)); + + // First check if we have any records Pre-Fetched. + final long expirationMillis = flowFileExpirationMillis.get(); + final PreFetch preFetch = preFetchRef.get(); + if (preFetch != null) { + if (preFetch.isExpired()) { + requeueExpiredPrefetch(preFetch); + } else { + long totalSize = 0L; + for (int i = 0; i < maxResults; i++) { + final FlowFileRecord next = preFetch.nextRecord(); + if (next == null) { + break; + } + + if (isLaterThan(getExpirationDate(next, expirationMillis))) { + expiredRecords.add(next); + continue; + } + + records.add(next); + totalSize += next.getSize(); + } + + // If anything was prefetched, use what we have. + if (!records.isEmpty()) { + updateUnacknowledgedSize(records.size(), totalSize); + return records; + } + + preFetchRef.compareAndSet(preFetch, null); + } + } + + writeLock.lock(); + try { + doPoll(records, maxResults, expiredRecords); + } finally { + activeQueueSizeRef.set(activeQueue.size()); + writeLock.unlock("poll(int, Set)"); + } + return records; + } + + private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) { + migrateSwapToActive(); + + final boolean queueFullAtStart = queueFullRef.get(); + + final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords); + + long expiredBytes = 0L; + for (final FlowFileRecord record : expiredRecords) { + expiredBytes += record.getSize(); + } + + activeQueueContentSize -= bytesDrained; + updateUnacknowledgedSize(records.size(), bytesDrained - expiredBytes); + + // if at least 1 FlowFile was expired & the queue was full before we started, then + // we need to determine whether or not the queue is full again. If no FlowFile was expired, + // then the queue will still be full until the appropriate #acknowledge method is called. + if (queueFullAtStart && !expiredRecords.isEmpty()) { + queueFullRef.set(determineIfFull()); + } + + if (incrementPollCount()) { + prefetch(); + } + } + + /** + * If there are FlowFiles waiting on the swap queue, move them to the active + * queue until we meet our threshold. This prevents us from having to swap + * them to disk & then back out. + * + * This method MUST be called with the writeLock held. + */ + private void migrateSwapToActive() { + // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't + // have to swap them out & then swap them back in. + // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles + // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out. + // In particular, this can happen if the queue is typically filled with surges. + // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave + // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in, + // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and + // swapped back in again. + // Calling this method when records are polled prevents this condition by migrating FlowFiles from the + // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out + // to disk, because we want them to be swapped back in in the same order that they were swapped out. + + // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense + // of other checks for 99.999% of the cases. + if (swappedRecordCount == 0 && swapQueue.isEmpty()) { + return; + } + + if (swappedRecordCount > swapQueue.size()) { + // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for + // an external process to swap FlowFiles back in. + return; + } + + final Iterator<FlowFileRecord> swapItr = swapQueue.iterator(); + while (activeQueue.size() < swapThreshold && swapItr.hasNext()) { + final FlowFileRecord toMigrate = swapItr.next(); + activeQueue.add(toMigrate); + activeQueueContentSize += toMigrate.getSize(); + swappedContentSize -= toMigrate.getSize(); + swappedRecordCount--; + + swapItr.remove(); + } + + if (swappedRecordCount == 0) { + swapMode = false; + } + } + + @Override + public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) { + long drainedSize = 0L; + FlowFileRecord pulled = null; + + final long expirationMillis = this.flowFileExpirationMillis.get(); + while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) { + if (isLaterThan(getExpirationDate(pulled, expirationMillis))) { + expiredRecords.add(pulled); + if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { + break; + } + } else { + if (pulled.isPenalized()) { + sourceQueue.add(pulled); + break; + } + destination.add(pulled); + } + drainedSize += pulled.getSize(); + } + return drainedSize; + } + + @Override + public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { + writeLock.lock(); + try { + migrateSwapToActive(); + if (activeQueue.isEmpty()) { + return Collections.emptyList(); + } + + final long expirationMillis = this.flowFileExpirationMillis.get(); + final boolean queueFullAtStart = queueFullRef.get(); + + final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); + final List<FlowFileRecord> unselected = new ArrayList<>(); + + while (true) { + FlowFileRecord flowFile = this.activeQueue.poll(); + if (flowFile == null) { + break; + } + + final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); + if (isExpired) { + expiredRecords.add(flowFile); + activeQueueContentSize -= flowFile.getSize(); + + if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { + break; + } else { + continue; + } + } else if (flowFile.isPenalized()) { + this.activeQueue.add(flowFile); + flowFile = null; + break; // just stop searching because the rest are all penalized. + } + + final FlowFileFilterResult result = filter.filter(flowFile); + if (result.isAccept()) { + activeQueueContentSize -= flowFile.getSize(); + + updateUnacknowledgedSize(1, flowFile.getSize()); + selectedFlowFiles.add(flowFile); + } else { + unselected.add(flowFile); + } + + if (!result.isContinue()) { + break; + } + } + + this.activeQueue.addAll(unselected); + + // if at least 1 FlowFile was expired & the queue was full before we started, then + // we need to determine whether or not the queue is full again. If no FlowFile was expired, + // then the queue will still be full until the appropriate #acknowledge method is called. + if (queueFullAtStart && !expiredRecords.isEmpty()) { + queueFullRef.set(determineIfFull()); + } + + return selectedFlowFiles; + } finally { + activeQueueSizeRef.set(activeQueue.size()); + writeLock.unlock("poll(Filter, Set)"); + } + } + + private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable { + + private static final long serialVersionUID = 1L; + private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>(); + + private Prioritizer(final List<FlowFilePrioritizer> priorities) { + if (null != priorities) { + prioritizers.addAll(priorities); + } + } + + @Override + public int compare(final FlowFileRecord f1, final FlowFileRecord f2) { + int returnVal = 0; + final boolean f1Penalized = f1.isPenalized(); + final boolean f2Penalized = f2.isPenalized(); + + if (f1Penalized && !f2Penalized) { + return 1; + } else if (!f1Penalized && f2Penalized) { + return -1; + } + + if (f1Penalized && f2Penalized) { + if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) { + return -1; + } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) { + return 1; + } + } + + if (!prioritizers.isEmpty()) { + for (final FlowFilePrioritizer prioritizer : prioritizers) { + returnVal = prioritizer.compare(f1, f2); + if (returnVal != 0) { + return returnVal; + } + } + } + + final ContentClaim claim1 = f1.getContentClaim(); + final ContentClaim claim2 = f2.getContentClaim(); + + // put the one without a claim first + if (claim1 == null && claim2 != null) { + return -1; + } else if (claim1 != null && claim2 == null) { + return 1; + } else if (claim1 != null && claim2 != null) { + final int claimComparison = claim1.compareTo(claim2); + if (claimComparison != 0) { + return claimComparison; + } + + final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset()); + if (claimOffsetComparison != 0) { + return claimOffsetComparison; + } + } + + return Long.compare(f1.getId(), f2.getId()); + } + } + + @Override + public String getFlowFileExpiration() { + return flowFileExpirationPeriod.get(); + } + + @Override + public int getFlowFileExpiration(final TimeUnit timeUnit) { + return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS); + } + + @Override + public void setFlowFileExpiration(final String flowExpirationPeriod) { + final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS); + if (millis < 0) { + throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); + } + this.flowFileExpirationPeriod.set(flowExpirationPeriod); + this.flowFileExpirationMillis.set(millis); + } + + @Override + public String toString() { + return "FlowFileQueue[id=" + identifier + "]"; + } + + /** + * Lock the queue so that other threads are unable to interact with the + * queue + */ + public void lock() { + writeLock.lock(); + } + + /** + * Unlock the queue + */ + public void unlock() { + writeLock.unlock("external unlock"); + } + + private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { + boolean updated = false; + + do { + QueueSize queueSize = unacknowledgedSizeRef.get(); + final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); + updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); + } while (!updated); + } + + private void requeueExpiredPrefetch(final PreFetch prefetch) { + if (prefetch == null) { + return; + } + + writeLock.lock(); + try { + final long contentSizeRequeued = prefetch.requeue(activeQueue); + this.activeQueueContentSize += contentSizeRequeued; + this.preFetchRef.compareAndSet(prefetch, null); + } finally { + writeLock.unlock("requeueExpiredPrefetch"); + } + } + + /** + * MUST be called with write lock held. + */ + private final AtomicReference<PreFetch> preFetchRef = new AtomicReference<>(); + + private void prefetch() { + if (activeQueue.isEmpty()) { + return; + } + + final int numToFetch = Math.min(prefetchSize, activeQueue.size()); + + final PreFetch curPreFetch = preFetchRef.get(); + if (curPreFetch != null && curPreFetch.size().getObjectCount() > 0) { + return; + } + + final List<FlowFileRecord> buffer = new ArrayList<>(numToFetch); + long contentSize = 0L; + for (int i = 0; i < numToFetch; i++) { + final FlowFileRecord record = activeQueue.poll(); + if (record == null || record.isPenalized()) { + // not enough unpenalized records to pull. Put all records back and return + activeQueue.addAll(buffer); + return; + } else { + buffer.add(record); + contentSize += record.getSize(); + } + } + + activeQueueContentSize -= contentSize; + preFetchRef.set(new PreFetch(buffer)); + } + + private final TimedBuffer<TimestampedLong> pollCounts = new TimedBuffer<>(TimeUnit.SECONDS, 5, new LongEntityAccess()); + + private boolean incrementPollCount() { + pollCounts.add(new TimestampedLong(1L)); + final long totalCount = pollCounts.getAggregateValue(System.currentTimeMillis() - 5000L).getValue(); + return totalCount > PREFETCH_POLL_THRESHOLD * 5; + } + + private static class PreFetch { + + private final List<FlowFileRecord> records; + private final AtomicInteger pointer = new AtomicInteger(0); + private final long expirationTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L); + private final AtomicLong contentSize = new AtomicLong(0L); + + public PreFetch(final List<FlowFileRecord> records) { + this.records = records; + + long totalSize = 0L; + for (final FlowFileRecord record : records) { + totalSize += record.getSize(); + } + contentSize.set(totalSize); + } + + public FlowFileRecord nextRecord() { + final int nextValue = pointer.getAndIncrement(); + if (nextValue >= records.size()) { + return null; + } + + final FlowFileRecord flowFile = records.get(nextValue); + contentSize.addAndGet(-flowFile.getSize()); + return flowFile; + } + + public QueueSize size() { + final int pointerIndex = pointer.get(); + final int count = records.size() - pointerIndex; + if (count < 0) { + return new QueueSize(0, 0L); + } + + final long bytes = contentSize.get(); + return new QueueSize(count, bytes); + } + + public boolean isExpired() { + return System.nanoTime() > expirationTime; + } + + private long requeue(final Queue<FlowFileRecord> queue) { + // get the current pointer and prevent any other thread from accessing the rest of the elements + final int curPointer = pointer.getAndAdd(records.size()); + if (curPointer < records.size() - 1) { + final List<FlowFileRecord> subList = records.subList(curPointer, records.size()); + long contentSize = 0L; + for (final FlowFileRecord record : subList) { + contentSize += record.getSize(); + } + + queue.addAll(subList); + + return contentSize; + } + return 0L; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java new file mode 100644 index 0000000..52a4e40 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.FormatUtils; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class StandardFunnel implements Funnel { + + public static final long MINIMUM_PENALIZATION_MILLIS = 0L; + public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + public static final long MINIMUM_YIELD_MILLIS = 0L; + public static final long DEFAULT_YIELD_PERIOD = 1000L; + public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS; + + private final String identifier; + private final Set<Connection> outgoingConnections; + private final List<Connection> incomingConnections; + private final List<Relationship> relationships; + + private final AtomicReference<ProcessGroup> processGroupRef; + private final AtomicReference<Position> position; + private final AtomicReference<String> penalizationPeriod; + private final AtomicReference<String> yieldPeriod; + private final AtomicReference<String> schedulingPeriod; + private final AtomicReference<String> name; + private final AtomicLong schedulingNanos; + private final AtomicBoolean lossTolerant; + private final AtomicReference<ScheduledState> scheduledState; + private final AtomicLong yieldExpiration; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + public StandardFunnel(final String identifier, final ProcessGroup processGroup, final ProcessScheduler scheduler) { + this.identifier = identifier; + this.processGroupRef = new AtomicReference<>(processGroup); + + outgoingConnections = new HashSet<>(); + incomingConnections = new ArrayList<>(); + + final List<Relationship> relationships = new ArrayList<>(); + relationships.add(Relationship.ANONYMOUS); + this.relationships = Collections.unmodifiableList(relationships); + + lossTolerant = new AtomicBoolean(false); + position = new AtomicReference<>(new Position(0D, 0D)); + scheduledState = new AtomicReference<>(ScheduledState.STOPPED); + penalizationPeriod = new AtomicReference<>("30 sec"); + yieldPeriod = new AtomicReference<>("1 sec"); + yieldExpiration = new AtomicLong(0L); + schedulingPeriod = new AtomicReference<>("0 millis"); + schedulingNanos = new AtomicLong(30000); + name = new AtomicReference<>("Funnel"); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public Collection<Relationship> getRelationships() { + return relationships; + } + + @Override + public Relationship getRelationship(final String relationshipName) { + return (Relationship.ANONYMOUS.getName().equals(relationshipName)) ? Relationship.ANONYMOUS : null; + } + + @Override + public void addConnection(final Connection connection) throws IllegalArgumentException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this) && !connection.getDestination().equals(this)) { + throw new IllegalArgumentException("Cannot add a connection to a Funnel for which the Funnel is neither the Source nor the Destination"); + } + if (connection.getSource().equals(this) && connection.getDestination().equals(this)) { + throw new IllegalArgumentException("Cannot add a connection from a Funnel back to itself"); + } + + if (connection.getDestination().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!incomingConnections.contains(connection)) { + incomingConnections.add(connection); + } + } + + if (connection.getSource().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!outgoingConnections.contains(connection)) { + for (final Relationship relationship : connection.getRelationships()) { + if (!relationship.equals(Relationship.ANONYMOUS)) { + throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Funnels"); + } + } + + outgoingConnections.add(connection); + } + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean hasIncomingConnection() { + readLock.lock(); + try { + return !incomingConnections.isEmpty(); + } finally { + readLock.unlock(); + } + } + + @Override + public void updateConnection(final Connection connection) throws IllegalStateException { + if (requireNonNull(connection).getSource().equals(this)) { + writeLock.lock(); + try { + if (!outgoingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + outgoingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } + + if (connection.getDestination().equals(this)) { + writeLock.lock(); + try { + if (!incomingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + incomingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } + } + + @Override + public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this)) { + final boolean existed = incomingConnections.remove(connection); + if (!existed) { + throw new IllegalStateException("The given connection is not currently registered for this ProcessorNode"); + } + return; + } + + final boolean removed = outgoingConnections.remove(connection); + if (!removed) { + throw new IllegalStateException(connection + " is not registered with " + this); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<Connection> getConnections() { + readLock.lock(); + try { + return Collections.unmodifiableSet(outgoingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public Set<Connection> getConnections(final Relationship relationship) { + readLock.lock(); + try { + if (relationship.equals(Relationship.ANONYMOUS)) { + return Collections.unmodifiableSet(outgoingConnections); + } + + throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Funnels"); + } finally { + readLock.unlock(); + } + } + + @Override + public List<Connection> getIncomingConnections() { + readLock.lock(); + try { + return new ArrayList<>(incomingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public Position getPosition() { + return position.get(); + } + + @Override + public void setPosition(Position position) { + this.position.set(position); + } + + @Override + public String getName() { + return name.get(); + } + + /** + * Throws {@link UnsupportedOperationException} + * + * @param name + */ + @Override + public void setName(final String name) { + throw new UnsupportedOperationException(); + } + + @Override + public String getComments() { + return ""; + } + + @Override + public void setComments(final String comments) { + throw new UnsupportedOperationException(); + } + + @Override + public ProcessGroup getProcessGroup() { + return processGroupRef.get(); + } + + @Override + public void setProcessGroup(final ProcessGroup group) { + processGroupRef.set(group); + } + + @Override + public boolean isAutoTerminated(Relationship relationship) { + return false; + } + + @Override + public boolean isRunning() { + return isRunning(this); + } + + private boolean isRunning(final Connectable source) { + return getScheduledState() == ScheduledState.RUNNING; + } + + @Override + public boolean isTriggerWhenEmpty() { + return false; + } + + @Override + public ScheduledState getScheduledState() { + return scheduledState.get(); + } + + @Override + public boolean isLossTolerant() { + return lossTolerant.get(); + } + + @Override + public void setLossTolerant(final boolean lossTolerant) { + this.lossTolerant.set(lossTolerant); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession session = sessionFactory.createSession(); + + try { + onTrigger(context, session); + session.commit(); + } catch (final ProcessException e) { + session.rollback(); + throw e; + } catch (final Throwable t) { + session.rollback(); + throw new RuntimeException(t); + } + } + + private void onTrigger(final ProcessContext context, final ProcessSession session) { + readLock.lock(); + try { + Set<Relationship> available = session.getAvailableRelationships(); + int transferred = 0; + while (!available.isEmpty()) { + final List<FlowFile> flowFiles = session.get(10); + if (flowFiles.isEmpty()) { + break; + } + + transferred += flowFiles.size(); + session.transfer(flowFiles, Relationship.ANONYMOUS); + session.commit(); + available = session.getAvailableRelationships(); + } + + if (transferred == 0) { + context.yield(); + } + } finally { + readLock.unlock(); + } + } + + /** + * Has no effect + */ + @Override + public void setMaxConcurrentTasks(int taskCount) { + } + + @Override + public int getMaxConcurrentTasks() { + return 1; + } + + @Override + public void setScheduledState(final ScheduledState scheduledState) { + this.scheduledState.set(scheduledState); + } + + @Override + public ConnectableType getConnectableType() { + return ConnectableType.FUNNEL; + } + + @Override + @SuppressWarnings("unchecked") + public Collection<ValidationResult> getValidationErrors() { + return Collections.EMPTY_LIST; + } + + /** + * Updates the amount of time that this processor should avoid being + * scheduled when the processor calls + * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} + * + * @param yieldPeriod + */ + @Override + public void setYieldPeriod(final String yieldPeriod) { + final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); + if (yieldMillis < 0) { + throw new IllegalArgumentException("Yield duration must be positive"); + } + this.yieldPeriod.set(yieldPeriod); + } + + /** + * @param schedulingPeriod + */ + @Override + public void setScheduldingPeriod(final String schedulingPeriod) { + final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); + if (schedulingNanos < 0) { + throw new IllegalArgumentException("Scheduling Period must be positive"); + } + + this.schedulingPeriod.set(schedulingPeriod); + this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public String getPenalizationPeriod() { + return penalizationPeriod.get(); + } + + /** + * Causes the processor not to be scheduled for some period of time. This + * duration can be obtained and set via the + * {@link #getYieldPeriod(TimeUnit)} and + * {@link #setYieldPeriod(long, TimeUnit)} methods. + */ + @Override + public void yield() { + final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); + yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); + } + + @Override + public long getYieldExpiration() { + return yieldExpiration.get(); + } + + @Override + public String getSchedulingPeriod() { + return schedulingPeriod.get(); + } + + @Override + public void setPenalizationPeriod(final String penalizationPeriod) { + this.penalizationPeriod.set(penalizationPeriod); + } + + @Override + public String getYieldPeriod() { + return yieldPeriod.get(); + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public long getSchedulingPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); + } + + @Override + public boolean isSideEffectFree() { + return true; + } + + @Override + public void verifyCanDelete(boolean ignoreConnections) throws IllegalStateException { + if (ignoreConnections) { + return; + } + + readLock.lock(); + try { + for (final Connection connection : outgoingConnections) { + connection.verifyCanDelete(); + } + + for (final Connection connection : incomingConnections) { + if (connection.getSource().equals(this)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException(this + " is the destination of another component"); + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanDelete() { + verifyCanDelete(false); + } + + @Override + public void verifyCanStart() { + } + + @Override + public void verifyCanStop() { + } + + @Override + public void verifyCanUpdate() { + } + + @Override + public void verifyCanEnable() { + } + + @Override + public void verifyCanDisable() { + } + + @Override + public SchedulingStrategy getSchedulingStrategy() { + return SchedulingStrategy.EVENT_DRIVEN; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java new file mode 100644 index 0000000..df3c251 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; + +public interface ValidationContextFactory { + + ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java new file mode 100644 index 0000000..2f43600 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.connectable.Connectable; + +public interface WorkerQueue { + + EventBasedWorker poll(long timeout, TimeUnit timeUnit); + + void offer(Connectable worker); + + void setClustered(boolean clustered); + + void setPrimary(boolean primary); + + void suspendWork(Connectable worker); + + void resumeWork(Connectable worker); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java new file mode 100644 index 0000000..368ed1b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.exception; + +import java.io.IOException; + +public class CommunicationsException extends IOException { + + private static final long serialVersionUID = 142343242323423L; + + public CommunicationsException() { + super(); + } + + public CommunicationsException(final Throwable cause) { + super(cause); + } + + public CommunicationsException(final String explanation) { + super(explanation); + } + + public CommunicationsException(final String explanation, final Throwable cause) { + super(explanation, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java new file mode 100644 index 0000000..0ff68b0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.exception; + +public class ControllerServiceAlreadyExistsException extends RuntimeException { + + private static final long serialVersionUID = -544424320587059277L; + + /** + * Constructs a default exception + * @param id + */ + public ControllerServiceAlreadyExistsException(final String id) { + super("A Controller Service already exists with ID " + id); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java new file mode 100644 index 0000000..4cdbe54 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.exception; + +public class ControllerServiceNotFoundException extends RuntimeException { + + private static final long serialVersionUID = -544424320587059277L; + + /** + * Constructs a default exception + */ + public ControllerServiceNotFoundException() { + super(); + } + + /** + * @param message + */ + public ControllerServiceNotFoundException(String message) { + super(message); + } + + /** + * @param cause + */ + public ControllerServiceNotFoundException(Throwable cause) { + super(cause); + } + + /** + * @param message + * @param cause + */ + public ControllerServiceNotFoundException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java new file mode 100644 index 0000000..c4aba44 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.exception; + +public class ProcessorInstantiationException extends Exception { + + private static final long serialVersionUID = 189273489L; + + public ProcessorInstantiationException(final String className, final Throwable t) { + super(className, t); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java new file mode 100644 index 0000000..5acca16 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.exception; + +public class ProcessorLifeCycleException extends RuntimeException { + + private static final long serialVersionUID = 8392341500511490941L; + + public ProcessorLifeCycleException(final String message, final Throwable t) { + super(message, t); + } + + public ProcessorLifeCycleException(final Throwable t) { + super(t); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java new file mode 100644 index 0000000..97c44b5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.label; + +import java.util.Map; + +import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Size; +import org.apache.nifi.groups.ProcessGroup; + +public interface Label { + + String getIdentifier(); + + Position getPosition(); + + void setPosition(Position position); + + Map<String, String> getStyle(); + + void setStyle(Map<String, String> style); + + Size getSize(); + + void setSize(Size size); + + ProcessGroup getProcessGroup(); + + void setProcessGroup(ProcessGroup group); + + String getValue(); + + void setValue(String value); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java new file mode 100644 index 0000000..ced6ff9 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.reporting; + +public class ReportingTaskInstantiationException extends Exception { + + private static final long serialVersionUID = 189234789237L; + + public ReportingTaskInstantiationException(final String className, final Throwable t) { + super(className, t); + } + + public ReportingTaskInstantiationException(final String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java new file mode 100644 index 0000000..6ce7ba6 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import org.apache.nifi.controller.repository.claim.ContentClaim; + +/** + * + * @author none + */ +public class ContentNotFoundException extends RuntimeException { + + private static final long serialVersionUID = 19048239082L; + private final transient ContentClaim claim; + + public ContentNotFoundException(final ContentClaim claim) { + super("Could not find content for " + claim); + this.claim = claim; + } + + public ContentNotFoundException(final ContentClaim claim, final Throwable t) { + super("Could not find content for " + claim, t); + this.claim = claim; + } + + public ContentNotFoundException(final ContentClaim claim, final String message) { + super("Could not find content for " + claim + ": " + message); + this.claim = claim; + } + + public ContentClaim getMissingClaim() { + return claim; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java new file mode 100644 index 0000000..de231ed --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.util.List; + +import org.apache.nifi.controller.Counter; + +public interface CounterRepository { + + void adjustCounter(String counterContext, String name, long delta); + + Counter getCounter(String counterContext, String name); + + List<Counter> getCounters(); + + List<Counter> getCounters(String counterContext); + + Counter resetCounter(String identifier); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java new file mode 100644 index 0000000..f07a530 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +public interface FlowFileEvent { + + String getComponentIdentifier(); + + int getFlowFilesIn(); + + int getFlowFilesOut(); + + int getFlowFilesRemoved(); + + long getContentSizeIn(); + + long getContentSizeOut(); + + long getContentSizeRemoved(); + + long getBytesRead(); + + long getBytesWritten(); + + long getProcessingNanoseconds(); + + long getAverageLineageMillis(); + + long getAggregateLineageMillis(); + + int getFlowFilesReceived(); + + long getBytesReceived(); + + int getFlowFilesSent(); + + long getBytesSent(); + + int getInvocations(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java new file mode 100644 index 0000000..2eb3caf --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.io.Closeable; +import java.io.IOException; + +/** + * + * @author none + */ +public interface FlowFileEventRepository extends Closeable { + + /** + * Updates the repository to include a new FlowFile processing event + * + * @param event + * @throws java.io.IOException + */ + void updateRepository(FlowFileEvent event) throws IOException; + + /** + * Returns a report of processing activity since the given time + * @param sinceEpochMillis + * @return + */ + RepositoryStatusReport reportTransferEvents(long sinceEpochMillis); + + /** + * Causes any flow file events of the given entry age in epoch milliseconds + * or older to be purged from the repository + * + * @param cutoffEpochMilliseconds + */ + void purgeTransferEvents(long cutoffEpochMilliseconds); +}