Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-731 bb5128be2 -> 4b5ba3c4e


NIFI-731: Try using a Stack to hold FLowFIle Repo partitions so that we tend to 
use the same partition to avoid disk seeks/thrashing


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4b5ba3c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4b5ba3c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4b5ba3c4

Branch: refs/heads/NIFI-731
Commit: 4b5ba3c4e9bbb2be5bd3f9bdf40159179a3b30e2
Parents: bb5128b
Author: Mark Payne <marka...@hotmail.com>
Authored: Mon Jun 29 17:32:27 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Tue Jun 30 11:04:42 2015 -0400

----------------------------------------------------------------------
 .../org/wali/MinimalLockingWriteAheadLog.java   | 173 +++++++++++++------
 .../WriteAheadFlowFileRepository.java           |   6 +-
 .../claim/StandardContentClaimManager.java      |  19 +-
 3 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4b5ba3c4/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 4331504..09887f7 100644
--- 
a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ 
b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -43,6 +43,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
+import java.util.Stack;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -82,11 +83,12 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
     private final SortedSet<Path> basePaths;
     private final SerDe<T> serde;
     private final SyncListener syncListener;
+    private final int updatesBetweenSync;
     private final FileChannel lockChannel;
     private final AtomicLong transactionIdGenerator = new AtomicLong(0L);
 
     private final Partition<T>[] partitions;
-    private final AtomicLong partitionIndex = new AtomicLong(0L);
+    private final Stack<Partition<T>> updatePartitionStack;
     private final ConcurrentMap<Object, T> recordMap = new 
ConcurrentHashMap<>();
     private final Map<Object, T> unmodifiableRecordMap = 
Collections.unmodifiableMap(recordMap);
     private final Set<String> externalLocations = new CopyOnWriteArraySet<>();
@@ -108,6 +110,10 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         this(new TreeSet<>(Collections.singleton(path)), partitionCount, 
serde, syncListener);
     }
 
+    public MinimalLockingWriteAheadLog(final Path path, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener, final 
int updatesBetweenSync) throws IOException {
+        this(new TreeSet<>(Collections.singleton(path)), partitionCount, 
serde, syncListener, updatesBetweenSync);
+    }
+
     /**
      *
      * @param paths a sorted set of Paths to use for the partitions/journals 
and
@@ -121,6 +127,25 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
      * @throws IOException if unable to initialize due to IO issue
      */
     public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
+        this(paths, partitionCount, serde, syncListener, 0);
+    }
+    
+    /**
+    *
+    * @param paths a sorted set of Paths to use for the partitions/journals and
+    * the snapshot. The snapshot will always be written to the first path
+    * specified.
+    * @param partitionCount the number of partitions/journals to use. For best
+    * performance, this should be close to the number of threads that are
+    * expected to update the repository simultaneously
+    * @param serde the serializer/deserializer for records
+    * @param syncListener the listener
+    * @param updatesBetweenSync if > 0, each partition will be sync'ed after 
it is updated
+    * this number of times
+    * @throws IOException if unable to initialize due to IO issue
+    */
+    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener, final 
int updatesBetweenSync) throws IOException {
+        this.updatesBetweenSync = updatesBetweenSync;
         this.syncListener = syncListener;
         this.basePaths = paths;
 
@@ -161,6 +186,10 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         lockChannel.lock();
 
         partitions = createPartitions(partitionCount);
+        updatePartitionStack = new Stack<>();
+        for (final Partition<T> partition : partitions) {
+            updatePartitionStack.push(partition);
+        }
     }
 
     private Partition<T>[] createPartitions(final int count) throws 
IOException {
@@ -176,7 +205,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
             final Path partitionBasePath = pathIterator.next();
 
-            partitions[i] = new 
Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, 
getVersion());
+            partitions[i] = new 
Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, 
getVersion(), updatesBetweenSync);
         }
 
         return partitions;
@@ -195,7 +224,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         updated = true;
         readLock.lock();
         try {
-            while (true) {
+            findPartition: while (true) {
                 final int numBlackListed = numberBlackListedPartitions.get();
                 if (numBlackListed >= partitions.length) {
                     throw new IOException("All Partitions have been 
blacklisted due to "
@@ -203,65 +232,85 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
                             + "this issue may resolve itself. Otherwise, 
manual intervention will be required.");
                 }
 
-                final long partitionIdx = partitionIndex.getAndIncrement();
-                final int resolvedIdx = (int) (partitionIdx % 
partitions.length);
-                final Partition<T> partition = partitions[resolvedIdx];
-                if (partition.tryClaim()) {
-                    try {
-                        final long transactionId = 
transactionIdGenerator.getAndIncrement();
-                        if (logger.isTraceEnabled()) {
-                            for (final T record : records) {
-                                logger.trace("Partition {} performing 
Transaction {}: {}", new Object[]{partition, transactionId, record});
-                            }
-                        }
-
+                Partition<T> partition;
+                synchronized (updatePartitionStack) {
+                    while (updatePartitionStack.isEmpty()) {
                         try {
-                            partition.update(records, transactionId, 
unmodifiableRecordMap, forceSync);
-                        } catch (final Exception e) {
-                            partition.blackList();
-                            numberBlackListedPartitions.incrementAndGet();
-                            throw e;
+                            updatePartitionStack.wait(1000);
+                        } catch (final InterruptedException e) {
+                            continue findPartition;
                         }
-
-                        if (forceSync && syncListener != null) {
-                            syncListener.onSync(resolvedIdx);
-                        }
-                    } finally {
-                        partition.releaseClaim();
                     }
+                    partition = updatePartitionStack.pop();
+                }
 
-                    for (final T record : records) {
-                        final UpdateType updateType = 
serde.getUpdateType(record);
-                        final Object recordIdentifier = 
serde.getRecordIdentifier(record);
-
-                        if (updateType == UpdateType.DELETE) {
-                            recordMap.remove(recordIdentifier);
-                        } else if (updateType == UpdateType.SWAP_OUT) {
-                            final String newLocation = 
serde.getLocation(record);
-                            if (newLocation == null) {
-                                logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_OUT but "
-                                        + "no indicator of where the Record is 
to be Swapped Out to; these records may be "
-                                        + "lost when the repository is 
restored!");
-                            } else {
-                                recordMap.remove(recordIdentifier);
-                                this.externalLocations.add(newLocation);
+                try {
+                    if (partition.tryClaim()) {
+                        final int partitionIdx = partition.getPartitionIndex();
+                        
+                        try {
+                            final long transactionId = 
transactionIdGenerator.getAndIncrement();
+                            if (logger.isTraceEnabled()) {
+                                for (final T record : records) {
+                                    logger.trace("Partition {} performing 
Transaction {}: {}", new Object[]{partition, transactionId, record});
+                                }
                             }
-                        } else if (updateType == UpdateType.SWAP_IN) {
-                            final String newLocation = 
serde.getLocation(record);
-                            if (newLocation == null) {
-                                logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_IN but no "
-                                        + "indicator of where the Record is to 
be Swapped In from; these records may be duplicated "
-                                        + "when the repository is restored!");
+    
+                            try {
+                                partition.update(records, transactionId, 
unmodifiableRecordMap, forceSync);
+                            } catch (final Exception e) {
+                                partition.blackList();
+                                numberBlackListedPartitions.incrementAndGet();
+                                throw e;
+                            }
+    
+                            if (forceSync && syncListener != null) {
+                                syncListener.onSync(partitionIdx);
+                            }
+                        } finally {
+                            partition.releaseClaim();
+                        }
+    
+                        for (final T record : records) {
+                            final UpdateType updateType = 
serde.getUpdateType(record);
+                            final Object recordIdentifier = 
serde.getRecordIdentifier(record);
+    
+                            if (updateType == UpdateType.DELETE) {
+                                recordMap.remove(recordIdentifier);
+                            } else if (updateType == UpdateType.SWAP_OUT) {
+                                final String newLocation = 
serde.getLocation(record);
+                                if (newLocation == null) {
+                                    logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_OUT but "
+                                            + "no indicator of where the 
Record is to be Swapped Out to; these records may be "
+                                            + "lost when the repository is 
restored!");
+                                } else {
+                                    recordMap.remove(recordIdentifier);
+                                    this.externalLocations.add(newLocation);
+                                }
+                            } else if (updateType == UpdateType.SWAP_IN) {
+                                final String newLocation = 
serde.getLocation(record);
+                                if (newLocation == null) {
+                                    logger.error("Received Record (ID=" + 
recordIdentifier + ") with UpdateType of SWAP_IN but no "
+                                            + "indicator of where the Record 
is to be Swapped In from; these records may be duplicated "
+                                            + "when the repository is 
restored!");
+                                } else {
+                                    externalLocations.remove(newLocation);
+                                }
+                                recordMap.put(recordIdentifier, record);
                             } else {
-                                externalLocations.remove(newLocation);
+                                recordMap.put(recordIdentifier, record);
                             }
-                            recordMap.put(recordIdentifier, record);
-                        } else {
-                            recordMap.put(recordIdentifier, record);
+                        }
+    
+                        return partitionIdx;
+                    }
+                } finally {
+                    synchronized (updatePartitionStack) {
+                        if (!partition.isBlackListed()) {
+                            updatePartitionStack.push(partition);
+                            updatePartitionStack.notify();
                         }
                     }
-
-                    return resolvedIdx;
                 }
             }
         } finally {
@@ -677,6 +726,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
         private final Path editDirectory;
         private final int writeAheadLogVersion;
+        private final int partitionIndex;
 
         private final Lock lock = new ReentrantLock();
         private DataOutputStream dataOut = null;
@@ -686,19 +736,23 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         private DataInputStream recoveryIn;
         private int recoveryVersion;
         private String currentJournalFilename = "";
+        private long updates = 0L;
 
         private static final byte TRANSACTION_CONTINUE = 1;
         private static final byte TRANSACTION_COMMIT = 2;
 
         private final String description;
+        private final int updatesBetweenSync;
         private final AtomicLong maxTransactionId = new AtomicLong(-1L);
         private final Logger logger = 
LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class);
 
         private final Queue<Path> recoveryFiles;
 
-        public Partition(final Path path, final SerDe<S> serde, final int 
partitionIndex, final int writeAheadLogVersion) throws IOException {
+        public Partition(final Path path, final SerDe<S> serde, final int 
partitionIndex, final int writeAheadLogVersion, final int updatesBetweenSync) 
throws IOException {
             this.editDirectory = path;
             this.serde = serde;
+            this.partitionIndex = partitionIndex;
+            this.updatesBetweenSync = updatesBetweenSync;
 
             final File file = path.toFile();
             if (!file.exists() && !file.mkdirs()) {
@@ -713,6 +767,10 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             this.description = "Partition-" + partitionIndex;
             this.writeAheadLogVersion = writeAheadLogVersion;
         }
+        
+        public int getPartitionIndex() {
+            return partitionIndex;
+        }
 
         public boolean tryClaim() {
             final boolean obtainedLock = lock.tryLock();
@@ -921,7 +979,8 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
             out.flush();
 
-            if (forceSync) {
+            updates++;
+            if (forceSync || (updatesBetweenSync > 0 && updates % 
updatesBetweenSync == 0)) {
                 fileOut.getFD().sync();
             }
         }
@@ -1067,6 +1126,10 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         public long getMaxRecoveredTransactionId() {
             return maxTransactionId.get();
         }
+        
+        public boolean isBlackListed() {
+            return blackListed;
+        }
 
         @Override
         public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4b5ba3c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 668adb8..61649dc 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -82,7 +82,6 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private final Path flowFileRepositoryPath;
     private final int numPartitions;
     private final ScheduledExecutorService checkpointExecutor;
-    private final AtomicLong updateCount = new AtomicLong(0L);
     private final int updatesBetweenSyncs;
 
     // effectively final
@@ -137,7 +136,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // backup and then the data deleted from the normal location; then can 
move backup to normal location and
         // delete backup. On restore, if no files exist in partition's 
directory, would have to check backup directory
         serde = new WriteAheadRecordSerde(claimManager);
-        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, 
numPartitions, serde, this);
+        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, 
numPartitions, serde, this, updatesBetweenSyncs);
     }
 
     @Override
@@ -167,8 +166,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     @Override
     public void updateRepository(final Collection<RepositoryRecord> records) 
throws IOException {
-        final long updates = updateCount.incrementAndGet();
-        updateRepository(records, alwaysSync || updatesBetweenSyncs > 0 && 
updates % updatesBetweenSyncs == 0);
+        updateRepository(records, alwaysSync);
     }
 
     private void updateRepository(final Collection<RepositoryRecord> records, 
final boolean sync) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4b5ba3c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
index f5494f9..aa24280 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
@@ -36,6 +37,7 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
 
     private static final ConcurrentMap<String, BlockingQueue<ContentClaim>> 
destructableClaims = new ConcurrentHashMap<>();
     private final EventReporter eventReporter;
+    private final AtomicLong lastBulletinTime = new AtomicLong(0L);
 
     public StandardContentClaimManager(final EventReporter eventReporter) {
         this.eventReporter = eventReporter;
@@ -123,16 +125,19 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
             final BlockingQueue<ContentClaim> destructableQueue = 
getDestructableClaimQueue(claim.getContainer());
             final boolean accepted = destructableQueue.offer(claim);
             if (!accepted) {
-                final long start = System.nanoTime();
-
                 while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) {
                 }
 
-                final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                if (millis > 10L) {
-                    logger.warn("Total wait duration to add claim to 
Destructable Claim Queue was {} millis", millis);
-                    eventReporter.reportEvent(Severity.WARNING, "Content 
Repository", "The Content Repository is unable to destroy content as fast "
-                        + "as it is being created. The flow will be slowed in 
order to adjust for this.");
+                // If it's been 5+ minutes since we emitted a bulletin, emit a 
bulletin notifying user that there is backpressure 
+                final long lastTimestamp = lastBulletinTime.get();
+                final long now = System.currentTimeMillis();
+                if (now - lastTimestamp > TimeUnit.MINUTES.toMillis(5)) {
+                    final boolean emitBulletin = 
lastBulletinTime.compareAndSet(lastTimestamp, now);
+                    
+                    if (emitBulletin) {
+                        eventReporter.reportEvent(Severity.WARNING, "Content 
Repository", "The Content Repository is unable to destroy content as fast "
+                            + "as it is being created. The flow will be slowed 
in order to adjust for this.");
+                    }
                 }
             }
         } catch (final InterruptedException ie) {

Reply via email to