http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java deleted file mode 100644 index f36a459..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java +++ /dev/null @@ -1,329 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.util.Connectables; - -public class EventDrivenWorkerQueue implements WorkerQueue { - - private final Object workMonitor = new Object(); - - private final Map<Connectable, Worker> workerMap = new HashMap<>(); // protected by synchronizing on workMonitor - private final WorkerReadyQueue workerQueue; - - public EventDrivenWorkerQueue(final boolean clustered, final boolean primary, final ProcessScheduler scheduler) { - workerQueue = new WorkerReadyQueue(scheduler); - workerQueue.setClustered(clustered); - workerQueue.setPrimary(primary); - } - - @Override - public void setClustered(final boolean clustered) { - workerQueue.setClustered(clustered); - } - - @Override - public void setPrimary(final boolean primary) { - workerQueue.setPrimary(primary); - } - - @Override - public Worker poll(final long timeout, final TimeUnit timeUnit) { - final long maxTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, timeUnit); - while (System.currentTimeMillis() < maxTime) { - synchronized (workMonitor) { - final Worker worker = workerQueue.poll(); - if (worker == null) { - // nothing to do. wait until we have something to do. - final long timeLeft = maxTime - System.currentTimeMillis(); - if (timeLeft <= 0) { - return null; - } - - try { - workMonitor.wait(timeLeft); - } catch (final InterruptedException ignored) { - } - } else { - // Decrement the amount of work there is to do for this worker. - final int workLeft = worker.decrementEventCount(); - if (workLeft > 0) { - workerQueue.offer(worker); - } - - return worker; - } - } - } - - return null; - } - - @Override - public void offer(final Connectable connectable) { - synchronized (workMonitor) { - Worker worker = workerMap.get(connectable); - if (worker == null) { - // if worker is null, then it has not been scheduled to run; ignore the event. - return; - } - - final int countBefore = worker.incrementEventCount(); - if (countBefore < 0) { - worker.setWorkCount(1); - } - if (countBefore <= 0) { - // If countBefore > 0 then it's already on the queue, so just incrementing its counter is sufficient. - workerQueue.offer(worker); - } - - workMonitor.notify(); - } - } - - private int getWorkCount(final Connectable connectable) { - int sum = 0; - for (final Connection connection : connectable.getIncomingConnections()) { - sum += connection.getFlowFileQueue().size().getObjectCount(); - } - return sum; - } - - @Override - public void resumeWork(final Connectable connectable) { - synchronized (workMonitor) { - final int workCount = getWorkCount(connectable); - final Worker worker = new Worker(connectable); - workerMap.put(connectable, worker); - - if (workCount > 0) { - worker.setWorkCount(workCount); - workerQueue.offer(worker); - workMonitor.notify(); - } - } - } - - @Override - public void suspendWork(final Connectable connectable) { - synchronized (workMonitor) { - final Worker worker = this.workerMap.remove(connectable); - if (worker == null) { - return; - } - - worker.resetWorkCount(); - workerQueue.remove(worker); - } - } - - public static class Worker implements EventBasedWorker { - - private final Connectable connectable; - private final AtomicInteger workCount = new AtomicInteger(0); - - public Worker(final Connectable connectable) { - this.connectable = connectable; - } - - @Override - public Connectable getConnectable() { - return connectable; - } - - @Override - public int decrementEventCount() { - return workCount.decrementAndGet(); - } - - @Override - public int incrementEventCount() { - return workCount.getAndIncrement(); - } - - void resetWorkCount() { - workCount.set(0); - } - - void setWorkCount(final int workCount) { - this.workCount.set(workCount); - } - } - - @SuppressWarnings("serial") - private static class WorkerReadyQueue extends LinkedList<Worker> { - - private final ProcessScheduler scheduler; - - private volatile boolean clustered = false; - private volatile boolean primary = false; - - public WorkerReadyQueue(final ProcessScheduler scheduler) { - this.scheduler = scheduler; - } - - public void setClustered(final boolean clustered) { - this.clustered = clustered; - } - - public void setPrimary(final boolean primary) { - this.primary = primary; - } - - @Override - public Worker poll() { - final List<Worker> putBack = new ArrayList<>(); - - Worker worker; - try { - while ((worker = super.poll()) != null) { - final DelayProcessingReason reason = getDelayReason(worker); - if (reason == null) { - return worker; - } else { - // Worker is not ready. We may want to add him back to the queue, depending on the reason that he is unready. - switch (reason) { - case YIELDED: - case ISOLATED: - case DESTINATION_FULL: - case ALL_WORK_PENALIZED: - case NO_WORK: - case TOO_MANY_THREADS: - // there will not be an event that triggers this to happen, so we add this worker back to the queue. - putBack.add(worker); - break; - default: - case NOT_RUNNING: - // There's no need to check if this worker is available again until a another event - // occurs. Therefore, we keep him off of the queue and reset his work count - worker.resetWorkCount(); - break; - } - } - } - } finally { - if (!putBack.isEmpty()) { - super.addAll(putBack); - } - } - - return null; - } - - private DelayProcessingReason getDelayReason(final Worker worker) { - final Connectable connectable = worker.getConnectable(); - - if (ScheduledState.RUNNING != connectable.getScheduledState()) { - return DelayProcessingReason.NOT_RUNNING; - } - - if (connectable.getYieldExpiration() > System.currentTimeMillis()) { - return DelayProcessingReason.YIELDED; - } - - // For Remote Output Ports, - int availableRelationshipCount = 0; - if (!connectable.getRelationships().isEmpty()) { - availableRelationshipCount = getAvailableRelationshipCount(connectable); - - if (availableRelationshipCount == 0) { - return DelayProcessingReason.DESTINATION_FULL; - } - } - - if (connectable.hasIncomingConnection() && !Connectables.flowFilesQueued(connectable)) { - return DelayProcessingReason.NO_WORK; - } - - final int activeThreadCount = scheduler.getActiveThreadCount(worker.getConnectable()); - final int maxThreadCount = worker.getConnectable().getMaxConcurrentTasks(); - if (maxThreadCount > 0 && activeThreadCount >= maxThreadCount) { - return DelayProcessingReason.TOO_MANY_THREADS; - } - - if (connectable instanceof ProcessorNode) { - final ProcessorNode procNode = (ProcessorNode) connectable; - if (procNode.isIsolated() && clustered && !primary) { - return DelayProcessingReason.ISOLATED; - } - - final boolean triggerWhenAnyAvailable = procNode.isTriggerWhenAnyDestinationAvailable(); - final boolean allDestinationsAvailable = availableRelationshipCount == procNode.getRelationships().size(); - if (!triggerWhenAnyAvailable && !allDestinationsAvailable) { - return DelayProcessingReason.DESTINATION_FULL; - } - } - - return null; - } - - private int getAvailableRelationshipCount(final Connectable connectable) { - int count = 0; - for (final Relationship relationship : connectable.getRelationships()) { - final Collection<Connection> connections = connectable.getConnections(relationship); - - if (connections == null || connections.isEmpty()) { - if (connectable.isAutoTerminated(relationship)) { - // If the relationship is auto-terminated, consider it available. - count++; - } - } else { - boolean available = true; - for (final Connection connection : connections) { - if (connection.getSource() == connection.getDestination()) { - // don't count self-loops - continue; - } - - if (connection.getFlowFileQueue().isFull()) { - available = false; - } - } - - if (available) { - count++; - } - } - } - - return count; - } - } - - private static enum DelayProcessingReason { - - YIELDED, - DESTINATION_FULL, - NO_WORK, - ALL_WORK_PENALIZED, - ISOLATED, - NOT_RUNNING, - TOO_MANY_THREADS; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java deleted file mode 100644 index e1d80b0..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ /dev/null @@ -1,768 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; - -import org.apache.nifi.controller.repository.ConnectionSwapInfo; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.FlowFileRepository; -import org.apache.nifi.controller.repository.FlowFileSwapManager; -import org.apache.nifi.controller.repository.QueueProvider; -import org.apache.nifi.controller.repository.StandardFlowFileRecord; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.processor.QueueSize; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * <p> - * An implementation of the {@link FlowFileSwapManager} that swaps FlowFiles - * to/from local disk - * </p> - */ -public class FileSystemSwapManager implements FlowFileSwapManager { - - public static final int MINIMUM_SWAP_COUNT = 10000; - private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); - private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part"); - - public static final int SWAP_ENCODING_VERSION = 6; - public static final String EVENT_CATEGORY = "Swap FlowFiles"; - - private final ScheduledExecutorService swapQueueIdentifierExecutor; - private final ScheduledExecutorService swapInExecutor; - private volatile FlowFileRepository flowFileRepository; - private volatile EventReporter eventReporter; - - // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in - private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>(); - private final File storageDirectory; - private final long swapInMillis; - private final long swapOutMillis; - private final int swapOutThreadCount; - - private ContentClaimManager claimManager; // effectively final - - private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class); - - public FileSystemSwapManager() { - final NiFiProperties properties = NiFiProperties.getInstance(); - final Path flowFileRepoPath = properties.getFlowFileRepositoryPath(); - - this.storageDirectory = flowFileRepoPath.resolve("swap").toFile(); - if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { - throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath()); - } - - swapQueueIdentifierExecutor = new FlowEngine(1, "Identifies Queues for FlowFile Swapping"); - - swapInMillis = FormatUtils.getTimeDuration(properties.getSwapInPeriod(), TimeUnit.MILLISECONDS); - swapOutMillis = FormatUtils.getTimeDuration(properties.getSwapOutPeriod(), TimeUnit.MILLISECONDS); - swapOutThreadCount = properties.getSwapOutThreads(); - swapInExecutor = new FlowEngine(properties.getSwapInThreads(), "Swap In FlowFiles"); - } - - @Override - public void purge() { - final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { - @Override - public boolean accept(final File dir, final String name) { - return SWAP_FILE_PATTERN.matcher(name).matches(); - } - }); - - if (swapFiles != null) { - for (final File file : swapFiles) { - if (!file.delete() && file.exists()) { - logger.warn("Failed to delete SWAP file {}", file); - } - } - } - } - - public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) { - this.claimManager = claimManager; - this.flowFileRepository = flowFileRepository; - this.eventReporter = eventReporter; - swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS); - swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS); - } - - public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException { - if (toSwap == null || toSwap.isEmpty()) { - return 0; - } - - long contentSize = 0L; - for (final FlowFileRecord record : toSwap) { - contentSize += record.getSize(); - } - - // persist record to disk via the swap file - final OutputStream bufferedOut = new BufferedOutputStream(destination); - final DataOutputStream out = new DataOutputStream(bufferedOut); - try { - out.writeInt(SWAP_ENCODING_VERSION); - out.writeUTF(queue.getIdentifier()); - out.writeInt(toSwap.size()); - out.writeLong(contentSize); - - for (final FlowFileRecord flowFile : toSwap) { - out.writeLong(flowFile.getId()); - out.writeLong(flowFile.getEntryDate()); - - final Set<String> lineageIdentifiers = flowFile.getLineageIdentifiers(); - out.writeInt(lineageIdentifiers.size()); - for (final String lineageId : lineageIdentifiers) { - out.writeUTF(lineageId); - } - - out.writeLong(flowFile.getLineageStartDate()); - out.writeLong(flowFile.getLastQueueDate()); - out.writeLong(flowFile.getSize()); - - final ContentClaim claim = flowFile.getContentClaim(); - if (claim == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeUTF(claim.getId()); - out.writeUTF(claim.getContainer()); - out.writeUTF(claim.getSection()); - out.writeLong(flowFile.getContentClaimOffset()); - out.writeBoolean(claim.isLossTolerant()); - } - - final Map<String, String> attributes = flowFile.getAttributes(); - out.writeInt(attributes.size()); - for (final Map.Entry<String, String> entry : attributes.entrySet()) { - writeString(entry.getKey(), out); - writeString(entry.getValue(), out); - } - } - } finally { - out.flush(); - } - - logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{toSwap.size(), queue, swapLocation}); - - return toSwap.size(); - } - - private void writeString(final String toWrite, final OutputStream out) throws IOException { - final byte[] bytes = toWrite.getBytes("UTF-8"); - final int utflen = bytes.length; - - if (utflen < 65535) { - out.write(utflen >>> 8); - out.write(utflen); - out.write(bytes); - } else { - out.write(255); - out.write(255); - out.write(utflen >>> 24); - out.write(utflen >>> 16); - out.write(utflen >>> 8); - out.write(utflen); - out.write(bytes); - } - } - - static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ContentClaimManager claimManager) throws IOException { - final int swapEncodingVersion = in.readInt(); - if (swapEncodingVersion > SWAP_ENCODING_VERSION) { - throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); - } - - final String connectionId = in.readUTF(); - if (!connectionId.equals(queue.getIdentifier())) { - throw new IllegalArgumentException("Cannot restore Swap File because the file indicates that records belong to Connection with ID " + connectionId + " but received Connection " + queue); - } - - final int numRecords = in.readInt(); - in.readLong(); // Content Size - - return deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, false, claimManager); - } - - static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue, final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException { - final List<FlowFileRecord> flowFiles = new ArrayList<>(); - for (int i = 0; i < numFlowFiles; i++) { - // legacy encoding had an "action" because it used to be couple with FlowFile Repository code - if (serializationVersion < 3) { - final int action = in.read(); - if (action != 1) { - throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type"); - } - } - - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - ffBuilder.id(in.readLong()); - ffBuilder.entryDate(in.readLong()); - - if (serializationVersion > 1) { - // Lineage information was added in version 2 - final int numLineageIdentifiers = in.readInt(); - final Set<String> lineageIdentifiers = new HashSet<>(numLineageIdentifiers); - for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) { - lineageIdentifiers.add(in.readUTF()); - } - ffBuilder.lineageIdentifiers(lineageIdentifiers); - ffBuilder.lineageStartDate(in.readLong()); - - if (serializationVersion > 5) { - ffBuilder.lastQueueDate(in.readLong()); - } - } - - ffBuilder.size(in.readLong()); - - if (serializationVersion < 3) { - readString(in); // connection Id - } - - final boolean hasClaim = in.readBoolean(); - if (hasClaim) { - final String claimId; - if (serializationVersion < 5) { - claimId = String.valueOf(in.readLong()); - } else { - claimId = in.readUTF(); - } - - final String container = in.readUTF(); - final String section = in.readUTF(); - final long claimOffset = in.readLong(); - - final boolean lossTolerant; - if (serializationVersion >= 4) { - lossTolerant = in.readBoolean(); - } else { - lossTolerant = false; - } - - final ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant); - - if (incrementContentClaims) { - claimManager.incrementClaimantCount(claim); - } - - ffBuilder.contentClaim(claim); - ffBuilder.contentClaimOffset(claimOffset); - } - - boolean attributesChanged = true; - if (serializationVersion < 3) { - attributesChanged = in.readBoolean(); - } - - if (attributesChanged) { - final int numAttributes = in.readInt(); - for (int j = 0; j < numAttributes; j++) { - final String key = readString(in); - final String value = readString(in); - - ffBuilder.addAttribute(key, value); - } - } - - final FlowFileRecord record = ffBuilder.build(); - flowFiles.add(record); - } - - return flowFiles; - } - - private static String readString(final InputStream in) throws IOException { - final Integer numBytes = readFieldLength(in); - if (numBytes == null) { - throw new EOFException(); - } - final byte[] bytes = new byte[numBytes]; - fillBuffer(in, bytes, numBytes); - return new String(bytes, "UTF-8"); - } - - private static Integer readFieldLength(final InputStream in) throws IOException { - final int firstValue = in.read(); - final int secondValue = in.read(); - if (firstValue < 0) { - return null; - } - if (secondValue < 0) { - throw new EOFException(); - } - if (firstValue == 0xff && secondValue == 0xff) { - int ch1 = in.read(); - int ch2 = in.read(); - int ch3 = in.read(); - int ch4 = in.read(); - if ((ch1 | ch2 | ch3 | ch4) < 0) { - throw new EOFException(); - } - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4)); - } else { - return ((firstValue << 8) + (secondValue)); - } - } - - private static void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { - int bytesRead; - int totalBytesRead = 0; - while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { - totalBytesRead += bytesRead; - } - if (totalBytesRead != length) { - throw new EOFException(); - } - } - - private class QueueIdentifier implements Runnable { - - private final QueueProvider connectionProvider; - - public QueueIdentifier(final QueueProvider connectionProvider) { - this.connectionProvider = connectionProvider; - } - - @Override - public void run() { - final Collection<FlowFileQueue> allQueues = connectionProvider.getAllQueues(); - final BlockingQueue<FlowFileQueue> connectionQueue = new LinkedBlockingQueue<>(allQueues); - - final ThreadFactory threadFactory = new ThreadFactory() { - @Override - public Thread newThread(final Runnable r) { - final Thread t = new Thread(r); - t.setName("Swap Out FlowFiles"); - return t; - } - }; - - final ExecutorService workerExecutor = Executors.newFixedThreadPool(swapOutThreadCount, threadFactory); - for (int i = 0; i < swapOutThreadCount; i++) { - workerExecutor.submit(new SwapOutTask(connectionQueue)); - } - - workerExecutor.shutdown(); - - try { - workerExecutor.awaitTermination(10, TimeUnit.MINUTES); - } catch (final InterruptedException e) { - // oh well... - } - } - } - - private class SwapInTask implements Runnable { - - @Override - public void run() { - for (final Map.Entry<FlowFileQueue, QueueLockWrapper> entry : swapMap.entrySet()) { - final FlowFileQueue flowFileQueue = entry.getKey(); - - // if queue is more than 60% of its swap threshold, don't swap flowfiles in - if (flowFileQueue.unswappedSize() >= ((float) flowFileQueue.getSwapThreshold() * 0.6F)) { - continue; - } - - final QueueLockWrapper queueLockWrapper = entry.getValue(); - if (queueLockWrapper.getLock().tryLock()) { - try { - final Queue<File> queue = queueLockWrapper.getQueue(); - - // Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files. - while (flowFileQueue.unswappedSize() < ((float) flowFileQueue.getSwapThreshold() * 0.9F)) { - File swapFile = null; - try { - swapFile = queue.poll(); - if (swapFile == null) { - break; - } - - try (final InputStream fis = new FileInputStream(swapFile); - final DataInputStream in = new DataInputStream(fis)) { - final List<FlowFileRecord> swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, claimManager); - flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue); - flowFileQueue.putSwappedRecords(swappedFlowFiles); - } - - if (!swapFile.delete()) { - warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"); - } - } catch (final EOFException eof) { - error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile); - - if ( !swapFile.delete() ) { - warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually"); - } - } catch (final FileNotFoundException fnfe) { - error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile); - } catch (final Exception e) { - error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e); - - if (swapFile != null) { - queue.add(swapFile); - } - } - } - } finally { - queueLockWrapper.getLock().unlock(); - } - } - } - } - } - - private void error(final String error, final Throwable t) { - error(error); - if ( logger.isDebugEnabled() ) { - logger.error("", t); - } - } - - private void error(final String error) { - logger.error(error); - if ( eventReporter != null ) { - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error); - } - } - - private void warn(final String warning) { - logger.warn(warning); - if ( eventReporter != null ) { - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning); - } - } - - - private class SwapOutTask implements Runnable { - private final BlockingQueue<FlowFileQueue> connectionQueue; - - public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) { - this.connectionQueue = connectionQueue; - } - - @Override - public void run() { - while (true) { - final FlowFileQueue flowFileQueue = connectionQueue.poll(); - if (flowFileQueue == null) { - logger.debug("No more FlowFile Queues to Swap Out"); - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("{} has {} FlowFiles to swap out", flowFileQueue, flowFileQueue.getSwapQueueSize()); - } - - while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) { - final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap"); - final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part"); - final String swapLocation = swapFile.getAbsolutePath(); - final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords(); - - int recordsSwapped; - try { - try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) { - recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); - fos.getFD().sync(); - } - - if ( swapTempFile.renameTo(swapFile) ) { - flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation); - } else { - error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile); - recordsSwapped = 0; - } - } catch (final IOException ioe) { - recordsSwapped = 0; - flowFileQueue.putSwappedRecords(toSwap); - error("Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe, ioe); - } - - if (recordsSwapped > 0) { - QueueLockWrapper swapQueue = swapMap.get(flowFileQueue); - if (swapQueue == null) { - swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>()); - QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue); - if (oldQueue != null) { - swapQueue = oldQueue; - } - } - - swapQueue.getQueue().add(swapFile); - } else { - swapTempFile.delete(); - } - } - } - } - } - - /** - * Recovers FlowFiles from all Swap Files, returning the largest FlowFile ID - * that was recovered. - * - * @param queueProvider - * @return - */ - @Override - public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) { - final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { - @Override - public boolean accept(final File dir, final String name) { - return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches(); - } - }); - - if (swapFiles == null) { - return 0L; - } - - final Collection<FlowFileQueue> allQueues = queueProvider.getAllQueues(); - final Map<String, FlowFileQueue> queueMap = new HashMap<>(); - for (final FlowFileQueue queue : allQueues) { - queueMap.put(queue.getIdentifier(), queue); - } - - final ConnectionSwapInfo swapInfo = new ConnectionSwapInfo(); - int swappedCount = 0; - long swappedBytes = 0L; - long maxRecoveredId = 0L; - - for (final File swapFile : swapFiles) { - if ( TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches() ) { - if ( swapFile.delete() ) { - logger.info("Removed incomplete/temporary Swap File " + swapFile); - } else { - warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually"); - } - - continue; - } - - // read record to disk via the swap file - try (final InputStream fis = new FileInputStream(swapFile); - final InputStream bufferedIn = new BufferedInputStream(fis); - final DataInputStream in = new DataInputStream(bufferedIn)) { - - final int swapEncodingVersion = in.readInt(); - if (swapEncodingVersion > SWAP_ENCODING_VERSION) { - final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"; - - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); - throw new IOException(errMsg); - } - - final String connectionId = in.readUTF(); - final FlowFileQueue queue = queueMap.get(connectionId); - if (queue == null) { - error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist"); - continue; - } - - final int numRecords = in.readInt(); - final long contentSize = in.readLong(); - - swapInfo.addSwapSizeInfo(connectionId, swapFile.getAbsolutePath(), new QueueSize(numRecords, contentSize)); - swappedCount += numRecords; - swappedBytes += contentSize; - - final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, true, claimManager); - long maxId = 0L; - for (final FlowFileRecord record : records) { - if (record.getId() > maxId) { - maxId = record.getId(); - } - } - - if (maxId > maxRecoveredId) { - maxRecoveredId = maxId; - } - } catch (final IOException ioe) { - error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe, ioe); - } - } - - restoreSwapLocations(queueMap.values(), swapInfo); - logger.info("Recovered {} FlowFiles ({} bytes) from Swap Files", swappedCount, swappedBytes); - return maxRecoveredId; - } - - public void restoreSwapLocations(final Collection<FlowFileQueue> flowFileQueues, final ConnectionSwapInfo swapInfo) { - for (final FlowFileQueue queue : flowFileQueues) { - final String queueId = queue.getIdentifier(); - final Collection<String> swapFileLocations = swapInfo.getSwapFileLocations(queueId); - if (swapFileLocations == null || swapFileLocations.isEmpty()) { - continue; - } - - final SortedMap<String, QueueSize> sortedFileQueueMap = new TreeMap<>(new SwapFileComparator()); - for (final String swapFileLocation : swapFileLocations) { - final QueueSize queueSize = swapInfo.getSwappedSize(queueId, swapFileLocation); - sortedFileQueueMap.put(swapFileLocation, queueSize); - } - - QueueLockWrapper fileQueue = swapMap.get(queue); - if (fileQueue == null) { - fileQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>()); - swapMap.put(queue, fileQueue); - } - - for (final Map.Entry<String, QueueSize> innerEntry : sortedFileQueueMap.entrySet()) { - final File swapFile = new File(innerEntry.getKey()); - final QueueSize size = innerEntry.getValue(); - fileQueue.getQueue().add(swapFile); - queue.incrementSwapCount(size.getObjectCount(), size.getByteCount()); - } - } - } - - public void shutdown() { - swapQueueIdentifierExecutor.shutdownNow(); - swapInExecutor.shutdownNow(); - } - - private static class SwapFileComparator implements Comparator<String> { - - @Override - public int compare(final String o1, final String o2) { - if (o1 == o2) { - return 0; - } - - final Long time1 = getTimestampFromFilename(o1); - final Long time2 = getTimestampFromFilename(o2); - - if (time1 == null && time2 == null) { - return 0; - } - if (time1 == null) { - return 1; - } - if (time2 == null) { - return -1; - } - - final int timeComparisonValue = time1.compareTo(time2); - if (timeComparisonValue != 0) { - return timeComparisonValue; - } - - return o1.compareTo(o2); - } - - private Long getTimestampFromFilename(final String fullyQualifiedFilename) { - if (fullyQualifiedFilename == null) { - return null; - } - - final File file = new File(fullyQualifiedFilename); - final String filename = file.getName(); - - final int idx = filename.indexOf("-"); - if (idx < 1) { - return null; - } - - final String millisVal = filename.substring(0, idx); - try { - return Long.parseLong(millisVal); - } catch (final NumberFormatException e) { - return null; - } - } - } - - private static class QueueLockWrapper { - - private final Lock lock = new ReentrantLock(); - private final Queue<File> queue; - - public QueueLockWrapper(final Queue<File> queue) { - this.queue = queue; - } - - public Queue<File> getQueue() { - return queue; - } - - public Lock getLock() { - return lock; - } - - @Override - public int hashCode() { - return queue.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (obj instanceof QueueLockWrapper) { - return queue.equals(((QueueLockWrapper) obj).queue); - } - return false; - } - } -}