Revert "NIFI-731: Try using a Stack to hold FLowFIle Repo partitions so that we 
tend to use the same partition to avoid disk seeks/thrashing"

This reverts commit 4b5ba3c4e9bbb2be5bd3f9bdf40159179a3b30e2.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b3eb4a1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b3eb4a1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b3eb4a1a

Branch: refs/heads/NIFI-731
Commit: b3eb4a1a0f29d0f94639174dfbd7ea05307cb99a
Parents: 1677db6
Author: Mark Payne <marka...@hotmail.com>
Authored: Tue Jun 30 13:59:57 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Tue Jun 30 13:59:57 2015 -0400

----------------------------------------------------------------------
 .../org/wali/MinimalLockingWriteAheadLog.java   | 173 ++++++-------------
 .../WriteAheadFlowFileRepository.java           |   6 +-
 .../claim/StandardContentClaimManager.java      |  19 +-
 3 files changed, 66 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b3eb4a1a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 09887f7..4331504 100644
--- 
a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ 
b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -43,7 +43,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
-import java.util.Stack;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -83,12 +82,11 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
     private final SortedSet<Path> basePaths;
     private final SerDe<T> serde;
     private final SyncListener syncListener;
-    private final int updatesBetweenSync;
     private final FileChannel lockChannel;
     private final AtomicLong transactionIdGenerator = new AtomicLong(0L);
 
     private final Partition<T>[] partitions;
-    private final Stack<Partition<T>> updatePartitionStack;
+    private final AtomicLong partitionIndex = new AtomicLong(0L);
     private final ConcurrentMap<Object, T> recordMap = new 
ConcurrentHashMap<>();
     private final Map<Object, T> unmodifiableRecordMap = 
Collections.unmodifiableMap(recordMap);
     private final Set<String> externalLocations = new CopyOnWriteArraySet<>();
@@ -110,10 +108,6 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         this(new TreeSet<>(Collections.singleton(path)), partitionCount, 
serde, syncListener);
     }
 
-    public MinimalLockingWriteAheadLog(final Path path, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener, final 
int updatesBetweenSync) throws IOException {
-        this(new TreeSet<>(Collections.singleton(path)), partitionCount, 
serde, syncListener, updatesBetweenSync);
-    }
-
     /**
      *
      * @param paths a sorted set of Paths to use for the partitions/journals 
and
@@ -127,25 +121,6 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
      * @throws IOException if unable to initialize due to IO issue
      */
     public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
-        this(paths, partitionCount, serde, syncListener, 0);
-    }
-    
-    /**
-    *
-    * @param paths a sorted set of Paths to use for the partitions/journals and
-    * the snapshot. The snapshot will always be written to the first path
-    * specified.
-    * @param partitionCount the number of partitions/journals to use. For best
-    * performance, this should be close to the number of threads that are
-    * expected to update the repository simultaneously
-    * @param serde the serializer/deserializer for records
-    * @param syncListener the listener
-    * @param updatesBetweenSync if > 0, each partition will be sync'ed after 
it is updated
-    * this number of times
-    * @throws IOException if unable to initialize due to IO issue
-    */
-    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener, final 
int updatesBetweenSync) throws IOException {
-        this.updatesBetweenSync = updatesBetweenSync;
         this.syncListener = syncListener;
         this.basePaths = paths;
 
@@ -186,10 +161,6 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         lockChannel.lock();
 
         partitions = createPartitions(partitionCount);
-        updatePartitionStack = new Stack<>();
-        for (final Partition<T> partition : partitions) {
-            updatePartitionStack.push(partition);
-        }
     }
 
     private Partition<T>[] createPartitions(final int count) throws 
IOException {
@@ -205,7 +176,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
             final Path partitionBasePath = pathIterator.next();
 
-            partitions[i] = new 
Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, 
getVersion(), updatesBetweenSync);
+            partitions[i] = new 
Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, 
getVersion());
         }
 
         return partitions;
@@ -224,7 +195,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         updated = true;
         readLock.lock();
         try {
-            findPartition: while (true) {
+            while (true) {
                 final int numBlackListed = numberBlackListedPartitions.get();
                 if (numBlackListed >= partitions.length) {
                     throw new IOException("All Partitions have been 
blacklisted due to "
@@ -232,85 +203,65 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
                             + "this issue may resolve itself. Otherwise, 
manual intervention will be required.");
                 }
 
-                Partition<T> partition;
-                synchronized (updatePartitionStack) {
-                    while (updatePartitionStack.isEmpty()) {
-                        try {
-                            updatePartitionStack.wait(1000);
-                        } catch (final InterruptedException e) {
-                            continue findPartition;
+                final long partitionIdx = partitionIndex.getAndIncrement();
+                final int resolvedIdx = (int) (partitionIdx % 
partitions.length);
+                final Partition<T> partition = partitions[resolvedIdx];
+                if (partition.tryClaim()) {
+                    try {
+                        final long transactionId = 
transactionIdGenerator.getAndIncrement();
+                        if (logger.isTraceEnabled()) {
+                            for (final T record : records) {
+                                logger.trace("Partition {} performing 
Transaction {}: {}", new Object[]{partition, transactionId, record});
+                            }
                         }
-                    }
-                    partition = updatePartitionStack.pop();
-                }
 
-                try {
-                    if (partition.tryClaim()) {
-                        final int partitionIdx = partition.getPartitionIndex();
-                        
                         try {
-                            final long transactionId = 
transactionIdGenerator.getAndIncrement();
-                            if (logger.isTraceEnabled()) {
-                                for (final T record : records) {
-                                    logger.trace("Partition {} performing 
Transaction {}: {}", new Object[]{partition, transactionId, record});
-                                }
-                            }
-    
-                            try {
-                                partition.update(records, transactionId, 
unmodifiableRecordMap, forceSync);
-                            } catch (final Exception e) {
-                                partition.blackList();
-                                numberBlackListedPartitions.incrementAndGet();
-                                throw e;
-                            }
-    
-                            if (forceSync && syncListener != null) {
-                                syncListener.onSync(partitionIdx);
-                            }
-                        } finally {
-                            partition.releaseClaim();
+                            partition.update(records, transactionId, 
unmodifiableRecordMap, forceSync);
+                        } catch (final Exception e) {
+                            partition.blackList();
+                            numberBlackListedPartitions.incrementAndGet();
+                            throw e;
+                        }
+
+                        if (forceSync && syncListener != null) {
+                            syncListener.onSync(resolvedIdx);
                         }
-    
-                        for (final T record : records) {
-                            final UpdateType updateType = 
serde.getUpdateType(record);
-                            final Object recordIdentifier = 
serde.getRecordIdentifier(record);
-    
-                            if (updateType == UpdateType.DELETE) {
+                    } finally {
+                        partition.releaseClaim();
+                    }
+
+                    for (final T record : records) {
+                        final UpdateType updateType = 
serde.getUpdateType(record);
+                        final Object recordIdentifier = 
serde.getRecordIdentifier(record);
+
+                        if (updateType == UpdateType.DELETE) {
+                            recordMap.remove(recordIdentifier);
+                        } else if (updateType == UpdateType.SWAP_OUT) {
+                            final String newLocation = 
serde.getLocation(record);
+                            if (newLocation == null) {
+                                logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_OUT but "
+                                        + "no indicator of where the Record is 
to be Swapped Out to; these records may be "
+                                        + "lost when the repository is 
restored!");
+                            } else {
                                 recordMap.remove(recordIdentifier);
-                            } else if (updateType == UpdateType.SWAP_OUT) {
-                                final String newLocation = 
serde.getLocation(record);
-                                if (newLocation == null) {
-                                    logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_OUT but "
-                                            + "no indicator of where the 
Record is to be Swapped Out to; these records may be "
-                                            + "lost when the repository is 
restored!");
-                                } else {
-                                    recordMap.remove(recordIdentifier);
-                                    this.externalLocations.add(newLocation);
-                                }
-                            } else if (updateType == UpdateType.SWAP_IN) {
-                                final String newLocation = 
serde.getLocation(record);
-                                if (newLocation == null) {
-                                    logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_IN but no "
-                                            + "indicator of where the Record 
is to be Swapped In from; these records may be duplicated "
-                                            + "when the repository is 
restored!");
-                                } else {
-                                    externalLocations.remove(newLocation);
-                                }
-                                recordMap.put(recordIdentifier, record);
+                                this.externalLocations.add(newLocation);
+                            }
+                        } else if (updateType == UpdateType.SWAP_IN) {
+                            final String newLocation = 
serde.getLocation(record);
+                            if (newLocation == null) {
+                                logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_IN but no "
+                                        + "indicator of where the Record is to 
be Swapped In from; these records may be duplicated "
+                                        + "when the repository is restored!");
                             } else {
-                                recordMap.put(recordIdentifier, record);
+                                externalLocations.remove(newLocation);
                             }
-                        }
-    
-                        return partitionIdx;
-                    }
-                } finally {
-                    synchronized (updatePartitionStack) {
-                        if (!partition.isBlackListed()) {
-                            updatePartitionStack.push(partition);
-                            updatePartitionStack.notify();
+                            recordMap.put(recordIdentifier, record);
+                        } else {
+                            recordMap.put(recordIdentifier, record);
                         }
                     }
+
+                    return resolvedIdx;
                 }
             }
         } finally {
@@ -726,7 +677,6 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
         private final Path editDirectory;
         private final int writeAheadLogVersion;
-        private final int partitionIndex;
 
         private final Lock lock = new ReentrantLock();
         private DataOutputStream dataOut = null;
@@ -736,23 +686,19 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         private DataInputStream recoveryIn;
         private int recoveryVersion;
         private String currentJournalFilename = "";
-        private long updates = 0L;
 
         private static final byte TRANSACTION_CONTINUE = 1;
         private static final byte TRANSACTION_COMMIT = 2;
 
         private final String description;
-        private final int updatesBetweenSync;
         private final AtomicLong maxTransactionId = new AtomicLong(-1L);
         private final Logger logger = 
LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
 
         private final Queue<Path> recoveryFiles;
 
-        public Partition(final Path path, final SerDe<S> serde, final int 
partitionIndex, final int writeAheadLogVersion, final int updatesBetweenSync) 
throws IOException {
+        public Partition(final Path path, final SerDe<S> serde, final int 
partitionIndex, final int writeAheadLogVersion) throws IOException {
             this.editDirectory = path;
             this.serde = serde;
-            this.partitionIndex = partitionIndex;
-            this.updatesBetweenSync = updatesBetweenSync;
 
             final File file = path.toFile();
             if (!file.exists() && !file.mkdirs()) {
@@ -767,10 +713,6 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             this.description = "Partition-" + partitionIndex;
             this.writeAheadLogVersion = writeAheadLogVersion;
         }
-        
-        public int getPartitionIndex() {
-            return partitionIndex;
-        }
 
         public boolean tryClaim() {
             final boolean obtainedLock = lock.tryLock();
@@ -979,8 +921,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
             out.flush();
 
-            updates++;
-            if (forceSync || (updatesBetweenSync > 0 && updates % 
updatesBetweenSync == 0)) {
+            if (forceSync) {
                 fileOut.getFD().sync();
             }
         }
@@ -1126,10 +1067,6 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         public long getMaxRecoveredTransactionId() {
             return maxTransactionId.get();
         }
-        
-        public boolean isBlackListed() {
-            return blackListed;
-        }
 
         @Override
         public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b3eb4a1a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 61649dc..668adb8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -82,6 +82,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private final Path flowFileRepositoryPath;
     private final int numPartitions;
     private final ScheduledExecutorService checkpointExecutor;
+    private final AtomicLong updateCount = new AtomicLong(0L);
     private final int updatesBetweenSyncs;
 
     // effectively final
@@ -136,7 +137,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // backup and then the data deleted from the normal location; then can 
move backup to normal location and
         // delete backup. On restore, if no files exist in partition's 
directory, would have to check backup directory
         serde = new WriteAheadRecordSerde(claimManager);
-        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, 
numPartitions, serde, this, updatesBetweenSyncs);
+        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, 
numPartitions, serde, this);
     }
 
     @Override
@@ -166,7 +167,8 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     @Override
     public void updateRepository(final Collection<RepositoryRecord> records) 
throws IOException {
-        updateRepository(records, alwaysSync);
+        final long updates = updateCount.incrementAndGet();
+        updateRepository(records, alwaysSync || updatesBetweenSyncs > 0 && 
updates % updatesBetweenSyncs == 0);
     }
 
     private void updateRepository(final Collection<RepositoryRecord> records, 
final boolean sync) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b3eb4a1a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
index aa24280..f5494f9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
@@ -37,7 +36,6 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
 
     private static final ConcurrentMap<String, BlockingQueue<ContentClaim>> 
destructableClaims = new ConcurrentHashMap<>();
     private final EventReporter eventReporter;
-    private final AtomicLong lastBulletinTime = new AtomicLong(0L);
 
     public StandardContentClaimManager(final EventReporter eventReporter) {
         this.eventReporter = eventReporter;
@@ -125,19 +123,16 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
             final BlockingQueue<ContentClaim> destructableQueue = 
getDestructableClaimQueue(claim.getContainer());
             final boolean accepted = destructableQueue.offer(claim);
             if (!accepted) {
+                final long start = System.nanoTime();
+
                 while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) {
                 }
 
-                // If it's been 5+ minutes since we emitted a bulletin, emit a 
bulletin notifying user that there is backpressure 
-                final long lastTimestamp = lastBulletinTime.get();
-                final long now = System.currentTimeMillis();
-                if (now - lastTimestamp > TimeUnit.MINUTES.toMillis(5)) {
-                    final boolean emitBulletin = 
lastBulletinTime.compareAndSet(lastTimestamp, now);
-                    
-                    if (emitBulletin) {
-                        eventReporter.reportEvent(Severity.WARNING, "Content 
Repository", "The Content Repository is unable to destroy content as fast "
-                            + "as it is being created. The flow will be slowed 
in order to adjust for this.");
-                    }
+                final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                if (millis > 10L) {
+                    logger.warn("Total wait duration to add claim to 
Destructable Claim Queue was {} millis", millis);
+                    eventReporter.reportEvent(Severity.WARNING, "Content 
Repository", "The Content Repository is unable to destroy content as fast "
+                        + "as it is being created. The flow will be slowed in 
order to adjust for this.");
                 }
             }
         } catch (final InterruptedException ie) {

Reply via email to