Revert "NIFI-731: Try using a Stack to hold FLowFIle Repo partitions so that we tend to use the same partition to avoid disk seeks/thrashing"
This reverts commit 4b5ba3c4e9bbb2be5bd3f9bdf40159179a3b30e2. Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b3eb4a1a Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b3eb4a1a Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b3eb4a1a Branch: refs/heads/NIFI-731 Commit: b3eb4a1a0f29d0f94639174dfbd7ea05307cb99a Parents: 1677db6 Author: Mark Payne <marka...@hotmail.com> Authored: Tue Jun 30 13:59:57 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Tue Jun 30 13:59:57 2015 -0400 ---------------------------------------------------------------------- .../org/wali/MinimalLockingWriteAheadLog.java | 173 ++++++------------- .../WriteAheadFlowFileRepository.java | 6 +- .../claim/StandardContentClaimManager.java | 19 +- 3 files changed, 66 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b3eb4a1a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 09887f7..4331504 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -43,7 +43,6 @@ import java.util.Queue; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; -import java.util.Stack; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -83,12 +82,11 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private final SortedSet<Path> basePaths; private final SerDe<T> serde; private final SyncListener syncListener; - private final int updatesBetweenSync; private final FileChannel lockChannel; private final AtomicLong transactionIdGenerator = new AtomicLong(0L); private final Partition<T>[] partitions; - private final Stack<Partition<T>> updatePartitionStack; + private final AtomicLong partitionIndex = new AtomicLong(0L); private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>(); private final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(recordMap); private final Set<String> externalLocations = new CopyOnWriteArraySet<>(); @@ -110,10 +108,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener); } - public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener, final int updatesBetweenSync) throws IOException { - this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener, updatesBetweenSync); - } - /** * * @param paths a sorted set of Paths to use for the partitions/journals and @@ -127,25 +121,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * @throws IOException if unable to initialize due to IO issue */ public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { - this(paths, partitionCount, serde, syncListener, 0); - } - - /** - * - * @param paths a sorted set of Paths to use for the partitions/journals and - * the snapshot. The snapshot will always be written to the first path - * specified. - * @param partitionCount the number of partitions/journals to use. For best - * performance, this should be close to the number of threads that are - * expected to update the repository simultaneously - * @param serde the serializer/deserializer for records - * @param syncListener the listener - * @param updatesBetweenSync if > 0, each partition will be sync'ed after it is updated - * this number of times - * @throws IOException if unable to initialize due to IO issue - */ - public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener, final int updatesBetweenSync) throws IOException { - this.updatesBetweenSync = updatesBetweenSync; this.syncListener = syncListener; this.basePaths = paths; @@ -186,10 +161,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor lockChannel.lock(); partitions = createPartitions(partitionCount); - updatePartitionStack = new Stack<>(); - for (final Partition<T> partition : partitions) { - updatePartitionStack.push(partition); - } } private Partition<T>[] createPartitions(final int count) throws IOException { @@ -205,7 +176,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor final Path partitionBasePath = pathIterator.next(); - partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion(), updatesBetweenSync); + partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion()); } return partitions; @@ -224,7 +195,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor updated = true; readLock.lock(); try { - findPartition: while (true) { + while (true) { final int numBlackListed = numberBlackListedPartitions.get(); if (numBlackListed >= partitions.length) { throw new IOException("All Partitions have been blacklisted due to " @@ -232,85 +203,65 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor + "this issue may resolve itself. Otherwise, manual intervention will be required."); } - Partition<T> partition; - synchronized (updatePartitionStack) { - while (updatePartitionStack.isEmpty()) { - try { - updatePartitionStack.wait(1000); - } catch (final InterruptedException e) { - continue findPartition; + final long partitionIdx = partitionIndex.getAndIncrement(); + final int resolvedIdx = (int) (partitionIdx % partitions.length); + final Partition<T> partition = partitions[resolvedIdx]; + if (partition.tryClaim()) { + try { + final long transactionId = transactionIdGenerator.getAndIncrement(); + if (logger.isTraceEnabled()) { + for (final T record : records) { + logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record}); + } } - } - partition = updatePartitionStack.pop(); - } - try { - if (partition.tryClaim()) { - final int partitionIdx = partition.getPartitionIndex(); - try { - final long transactionId = transactionIdGenerator.getAndIncrement(); - if (logger.isTraceEnabled()) { - for (final T record : records) { - logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record}); - } - } - - try { - partition.update(records, transactionId, unmodifiableRecordMap, forceSync); - } catch (final Exception e) { - partition.blackList(); - numberBlackListedPartitions.incrementAndGet(); - throw e; - } - - if (forceSync && syncListener != null) { - syncListener.onSync(partitionIdx); - } - } finally { - partition.releaseClaim(); + partition.update(records, transactionId, unmodifiableRecordMap, forceSync); + } catch (final Exception e) { + partition.blackList(); + numberBlackListedPartitions.incrementAndGet(); + throw e; + } + + if (forceSync && syncListener != null) { + syncListener.onSync(resolvedIdx); } - - for (final T record : records) { - final UpdateType updateType = serde.getUpdateType(record); - final Object recordIdentifier = serde.getRecordIdentifier(record); - - if (updateType == UpdateType.DELETE) { + } finally { + partition.releaseClaim(); + } + + for (final T record : records) { + final UpdateType updateType = serde.getUpdateType(record); + final Object recordIdentifier = serde.getRecordIdentifier(record); + + if (updateType == UpdateType.DELETE) { + recordMap.remove(recordIdentifier); + } else if (updateType == UpdateType.SWAP_OUT) { + final String newLocation = serde.getLocation(record); + if (newLocation == null) { + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but " + + "no indicator of where the Record is to be Swapped Out to; these records may be " + + "lost when the repository is restored!"); + } else { recordMap.remove(recordIdentifier); - } else if (updateType == UpdateType.SWAP_OUT) { - final String newLocation = serde.getLocation(record); - if (newLocation == null) { - logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but " - + "no indicator of where the Record is to be Swapped Out to; these records may be " - + "lost when the repository is restored!"); - } else { - recordMap.remove(recordIdentifier); - this.externalLocations.add(newLocation); - } - } else if (updateType == UpdateType.SWAP_IN) { - final String newLocation = serde.getLocation(record); - if (newLocation == null) { - logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no " - + "indicator of where the Record is to be Swapped In from; these records may be duplicated " - + "when the repository is restored!"); - } else { - externalLocations.remove(newLocation); - } - recordMap.put(recordIdentifier, record); + this.externalLocations.add(newLocation); + } + } else if (updateType == UpdateType.SWAP_IN) { + final String newLocation = serde.getLocation(record); + if (newLocation == null) { + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no " + + "indicator of where the Record is to be Swapped In from; these records may be duplicated " + + "when the repository is restored!"); } else { - recordMap.put(recordIdentifier, record); + externalLocations.remove(newLocation); } - } - - return partitionIdx; - } - } finally { - synchronized (updatePartitionStack) { - if (!partition.isBlackListed()) { - updatePartitionStack.push(partition); - updatePartitionStack.notify(); + recordMap.put(recordIdentifier, record); + } else { + recordMap.put(recordIdentifier, record); } } + + return resolvedIdx; } } } finally { @@ -726,7 +677,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private final Path editDirectory; private final int writeAheadLogVersion; - private final int partitionIndex; private final Lock lock = new ReentrantLock(); private DataOutputStream dataOut = null; @@ -736,23 +686,19 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private DataInputStream recoveryIn; private int recoveryVersion; private String currentJournalFilename = ""; - private long updates = 0L; private static final byte TRANSACTION_CONTINUE = 1; private static final byte TRANSACTION_COMMIT = 2; private final String description; - private final int updatesBetweenSync; private final AtomicLong maxTransactionId = new AtomicLong(-1L); private final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class); private final Queue<Path> recoveryFiles; - public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion, final int updatesBetweenSync) throws IOException { + public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion) throws IOException { this.editDirectory = path; this.serde = serde; - this.partitionIndex = partitionIndex; - this.updatesBetweenSync = updatesBetweenSync; final File file = path.toFile(); if (!file.exists() && !file.mkdirs()) { @@ -767,10 +713,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor this.description = "Partition-" + partitionIndex; this.writeAheadLogVersion = writeAheadLogVersion; } - - public int getPartitionIndex() { - return partitionIndex; - } public boolean tryClaim() { final boolean obtainedLock = lock.tryLock(); @@ -979,8 +921,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor out.flush(); - updates++; - if (forceSync || (updatesBetweenSync > 0 && updates % updatesBetweenSync == 0)) { + if (forceSync) { fileOut.getFD().sync(); } } @@ -1126,10 +1067,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor public long getMaxRecoveredTransactionId() { return maxTransactionId.get(); } - - public boolean isBlackListed() { - return blackListed; - } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b3eb4a1a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 61649dc..668adb8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -82,6 +82,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private final Path flowFileRepositoryPath; private final int numPartitions; private final ScheduledExecutorService checkpointExecutor; + private final AtomicLong updateCount = new AtomicLong(0L); private final int updatesBetweenSyncs; // effectively final @@ -136,7 +137,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // backup and then the data deleted from the normal location; then can move backup to normal location and // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory serde = new WriteAheadRecordSerde(claimManager); - wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serde, this, updatesBetweenSyncs); + wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serde, this); } @Override @@ -166,7 +167,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public void updateRepository(final Collection<RepositoryRecord> records) throws IOException { - updateRepository(records, alwaysSync); + final long updates = updateCount.incrementAndGet(); + updateRepository(records, alwaysSync || updatesBetweenSyncs > 0 && updates % updatesBetweenSyncs == 0); } private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b3eb4a1a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java index aa24280..f5494f9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; @@ -37,7 +36,6 @@ public class StandardContentClaimManager implements ContentClaimManager { private static final ConcurrentMap<String, BlockingQueue<ContentClaim>> destructableClaims = new ConcurrentHashMap<>(); private final EventReporter eventReporter; - private final AtomicLong lastBulletinTime = new AtomicLong(0L); public StandardContentClaimManager(final EventReporter eventReporter) { this.eventReporter = eventReporter; @@ -125,19 +123,16 @@ public class StandardContentClaimManager implements ContentClaimManager { final BlockingQueue<ContentClaim> destructableQueue = getDestructableClaimQueue(claim.getContainer()); final boolean accepted = destructableQueue.offer(claim); if (!accepted) { + final long start = System.nanoTime(); + while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) { } - // If it's been 5+ minutes since we emitted a bulletin, emit a bulletin notifying user that there is backpressure - final long lastTimestamp = lastBulletinTime.get(); - final long now = System.currentTimeMillis(); - if (now - lastTimestamp > TimeUnit.MINUTES.toMillis(5)) { - final boolean emitBulletin = lastBulletinTime.compareAndSet(lastTimestamp, now); - - if (emitBulletin) { - eventReporter.reportEvent(Severity.WARNING, "Content Repository", "The Content Repository is unable to destroy content as fast " - + "as it is being created. The flow will be slowed in order to adjust for this."); - } + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + if (millis > 10L) { + logger.warn("Total wait duration to add claim to Destructable Claim Queue was {} millis", millis); + eventReporter.reportEvent(Severity.WARNING, "Content Repository", "The Content Repository is unable to destroy content as fast " + + "as it is being created. The flow will be slowed in order to adjust for this."); } } } catch (final InterruptedException ie) {