Repository: incubator-nifi Updated Branches: refs/heads/NIFI-731 bb5128be2 -> 4b5ba3c4e
NIFI-731: Try using a Stack to hold FLowFIle Repo partitions so that we tend to use the same partition to avoid disk seeks/thrashing Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4b5ba3c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4b5ba3c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4b5ba3c4 Branch: refs/heads/NIFI-731 Commit: 4b5ba3c4e9bbb2be5bd3f9bdf40159179a3b30e2 Parents: bb5128b Author: Mark Payne <marka...@hotmail.com> Authored: Mon Jun 29 17:32:27 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Tue Jun 30 11:04:42 2015 -0400 ---------------------------------------------------------------------- .../org/wali/MinimalLockingWriteAheadLog.java | 173 +++++++++++++------ .../WriteAheadFlowFileRepository.java | 6 +- .../claim/StandardContentClaimManager.java | 19 +- 3 files changed, 132 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4b5ba3c4/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 4331504..09887f7 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -43,6 +43,7 @@ import java.util.Queue; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; +import java.util.Stack; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -82,11 +83,12 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private final SortedSet<Path> basePaths; private final SerDe<T> serde; private final SyncListener syncListener; + private final int updatesBetweenSync; private final FileChannel lockChannel; private final AtomicLong transactionIdGenerator = new AtomicLong(0L); private final Partition<T>[] partitions; - private final AtomicLong partitionIndex = new AtomicLong(0L); + private final Stack<Partition<T>> updatePartitionStack; private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>(); private final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(recordMap); private final Set<String> externalLocations = new CopyOnWriteArraySet<>(); @@ -108,6 +110,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener); } + public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener, final int updatesBetweenSync) throws IOException { + this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener, updatesBetweenSync); + } + /** * * @param paths a sorted set of Paths to use for the partitions/journals and @@ -121,6 +127,25 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * @throws IOException if unable to initialize due to IO issue */ public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { + this(paths, partitionCount, serde, syncListener, 0); + } + + /** + * + * @param paths a sorted set of Paths to use for the partitions/journals and + * the snapshot. The snapshot will always be written to the first path + * specified. + * @param partitionCount the number of partitions/journals to use. For best + * performance, this should be close to the number of threads that are + * expected to update the repository simultaneously + * @param serde the serializer/deserializer for records + * @param syncListener the listener + * @param updatesBetweenSync if > 0, each partition will be sync'ed after it is updated + * this number of times + * @throws IOException if unable to initialize due to IO issue + */ + public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener, final int updatesBetweenSync) throws IOException { + this.updatesBetweenSync = updatesBetweenSync; this.syncListener = syncListener; this.basePaths = paths; @@ -161,6 +186,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor lockChannel.lock(); partitions = createPartitions(partitionCount); + updatePartitionStack = new Stack<>(); + for (final Partition<T> partition : partitions) { + updatePartitionStack.push(partition); + } } private Partition<T>[] createPartitions(final int count) throws IOException { @@ -176,7 +205,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor final Path partitionBasePath = pathIterator.next(); - partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion()); + partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion(), updatesBetweenSync); } return partitions; @@ -195,7 +224,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor updated = true; readLock.lock(); try { - while (true) { + findPartition: while (true) { final int numBlackListed = numberBlackListedPartitions.get(); if (numBlackListed >= partitions.length) { throw new IOException("All Partitions have been blacklisted due to " @@ -203,65 +232,85 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor + "this issue may resolve itself. Otherwise, manual intervention will be required."); } - final long partitionIdx = partitionIndex.getAndIncrement(); - final int resolvedIdx = (int) (partitionIdx % partitions.length); - final Partition<T> partition = partitions[resolvedIdx]; - if (partition.tryClaim()) { - try { - final long transactionId = transactionIdGenerator.getAndIncrement(); - if (logger.isTraceEnabled()) { - for (final T record : records) { - logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record}); - } - } - + Partition<T> partition; + synchronized (updatePartitionStack) { + while (updatePartitionStack.isEmpty()) { try { - partition.update(records, transactionId, unmodifiableRecordMap, forceSync); - } catch (final Exception e) { - partition.blackList(); - numberBlackListedPartitions.incrementAndGet(); - throw e; + updatePartitionStack.wait(1000); + } catch (final InterruptedException e) { + continue findPartition; } - - if (forceSync && syncListener != null) { - syncListener.onSync(resolvedIdx); - } - } finally { - partition.releaseClaim(); } + partition = updatePartitionStack.pop(); + } - for (final T record : records) { - final UpdateType updateType = serde.getUpdateType(record); - final Object recordIdentifier = serde.getRecordIdentifier(record); - - if (updateType == UpdateType.DELETE) { - recordMap.remove(recordIdentifier); - } else if (updateType == UpdateType.SWAP_OUT) { - final String newLocation = serde.getLocation(record); - if (newLocation == null) { - logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but " - + "no indicator of where the Record is to be Swapped Out to; these records may be " - + "lost when the repository is restored!"); - } else { - recordMap.remove(recordIdentifier); - this.externalLocations.add(newLocation); + try { + if (partition.tryClaim()) { + final int partitionIdx = partition.getPartitionIndex(); + + try { + final long transactionId = transactionIdGenerator.getAndIncrement(); + if (logger.isTraceEnabled()) { + for (final T record : records) { + logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record}); + } } - } else if (updateType == UpdateType.SWAP_IN) { - final String newLocation = serde.getLocation(record); - if (newLocation == null) { - logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no " - + "indicator of where the Record is to be Swapped In from; these records may be duplicated " - + "when the repository is restored!"); + + try { + partition.update(records, transactionId, unmodifiableRecordMap, forceSync); + } catch (final Exception e) { + partition.blackList(); + numberBlackListedPartitions.incrementAndGet(); + throw e; + } + + if (forceSync && syncListener != null) { + syncListener.onSync(partitionIdx); + } + } finally { + partition.releaseClaim(); + } + + for (final T record : records) { + final UpdateType updateType = serde.getUpdateType(record); + final Object recordIdentifier = serde.getRecordIdentifier(record); + + if (updateType == UpdateType.DELETE) { + recordMap.remove(recordIdentifier); + } else if (updateType == UpdateType.SWAP_OUT) { + final String newLocation = serde.getLocation(record); + if (newLocation == null) { + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but " + + "no indicator of where the Record is to be Swapped Out to; these records may be " + + "lost when the repository is restored!"); + } else { + recordMap.remove(recordIdentifier); + this.externalLocations.add(newLocation); + } + } else if (updateType == UpdateType.SWAP_IN) { + final String newLocation = serde.getLocation(record); + if (newLocation == null) { + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no " + + "indicator of where the Record is to be Swapped In from; these records may be duplicated " + + "when the repository is restored!"); + } else { + externalLocations.remove(newLocation); + } + recordMap.put(recordIdentifier, record); } else { - externalLocations.remove(newLocation); + recordMap.put(recordIdentifier, record); } - recordMap.put(recordIdentifier, record); - } else { - recordMap.put(recordIdentifier, record); + } + + return partitionIdx; + } + } finally { + synchronized (updatePartitionStack) { + if (!partition.isBlackListed()) { + updatePartitionStack.push(partition); + updatePartitionStack.notify(); } } - - return resolvedIdx; } } } finally { @@ -677,6 +726,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private final Path editDirectory; private final int writeAheadLogVersion; + private final int partitionIndex; private final Lock lock = new ReentrantLock(); private DataOutputStream dataOut = null; @@ -686,19 +736,23 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private DataInputStream recoveryIn; private int recoveryVersion; private String currentJournalFilename = ""; + private long updates = 0L; private static final byte TRANSACTION_CONTINUE = 1; private static final byte TRANSACTION_COMMIT = 2; private final String description; + private final int updatesBetweenSync; private final AtomicLong maxTransactionId = new AtomicLong(-1L); private final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class); private final Queue<Path> recoveryFiles; - public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion) throws IOException { + public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion, final int updatesBetweenSync) throws IOException { this.editDirectory = path; this.serde = serde; + this.partitionIndex = partitionIndex; + this.updatesBetweenSync = updatesBetweenSync; final File file = path.toFile(); if (!file.exists() && !file.mkdirs()) { @@ -713,6 +767,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor this.description = "Partition-" + partitionIndex; this.writeAheadLogVersion = writeAheadLogVersion; } + + public int getPartitionIndex() { + return partitionIndex; + } public boolean tryClaim() { final boolean obtainedLock = lock.tryLock(); @@ -921,7 +979,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor out.flush(); - if (forceSync) { + updates++; + if (forceSync || (updatesBetweenSync > 0 && updates % updatesBetweenSync == 0)) { fileOut.getFD().sync(); } } @@ -1067,6 +1126,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor public long getMaxRecoveredTransactionId() { return maxTransactionId.get(); } + + public boolean isBlackListed() { + return blackListed; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4b5ba3c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 668adb8..61649dc 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -82,7 +82,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private final Path flowFileRepositoryPath; private final int numPartitions; private final ScheduledExecutorService checkpointExecutor; - private final AtomicLong updateCount = new AtomicLong(0L); private final int updatesBetweenSyncs; // effectively final @@ -137,7 +136,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // backup and then the data deleted from the normal location; then can move backup to normal location and // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory serde = new WriteAheadRecordSerde(claimManager); - wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serde, this); + wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serde, this, updatesBetweenSyncs); } @Override @@ -167,8 +166,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public void updateRepository(final Collection<RepositoryRecord> records) throws IOException { - final long updates = updateCount.incrementAndGet(); - updateRepository(records, alwaysSync || updatesBetweenSyncs > 0 && updates % updatesBetweenSyncs == 0); + updateRepository(records, alwaysSync); } private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4b5ba3c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java index f5494f9..aa24280 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; @@ -36,6 +37,7 @@ public class StandardContentClaimManager implements ContentClaimManager { private static final ConcurrentMap<String, BlockingQueue<ContentClaim>> destructableClaims = new ConcurrentHashMap<>(); private final EventReporter eventReporter; + private final AtomicLong lastBulletinTime = new AtomicLong(0L); public StandardContentClaimManager(final EventReporter eventReporter) { this.eventReporter = eventReporter; @@ -123,16 +125,19 @@ public class StandardContentClaimManager implements ContentClaimManager { final BlockingQueue<ContentClaim> destructableQueue = getDestructableClaimQueue(claim.getContainer()); final boolean accepted = destructableQueue.offer(claim); if (!accepted) { - final long start = System.nanoTime(); - while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) { } - final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - if (millis > 10L) { - logger.warn("Total wait duration to add claim to Destructable Claim Queue was {} millis", millis); - eventReporter.reportEvent(Severity.WARNING, "Content Repository", "The Content Repository is unable to destroy content as fast " - + "as it is being created. The flow will be slowed in order to adjust for this."); + // If it's been 5+ minutes since we emitted a bulletin, emit a bulletin notifying user that there is backpressure + final long lastTimestamp = lastBulletinTime.get(); + final long now = System.currentTimeMillis(); + if (now - lastTimestamp > TimeUnit.MINUTES.toMillis(5)) { + final boolean emitBulletin = lastBulletinTime.compareAndSet(lastTimestamp, now); + + if (emitBulletin) { + eventReporter.reportEvent(Severity.WARNING, "Content Repository", "The Content Repository is unable to destroy content as fast " + + "as it is being created. The flow will be slowed in order to adjust for this."); + } } } } catch (final InterruptedException ie) {