NIFI-3356: Initial implementation of writeahead provenance repository
- The idea behind NIFI-3356 was to improve the efficiency and throughput of the 
Provenance Repository, as it is often the bottleneck. While testing the newly 
designed repository,
  a handful of other, fairly minor, changes were made to improve efficiency as 
well, as these came to light when testing the new repository:

- Use a BufferedOutputStream within StandardProcessSession (via a ClaimCache 
abstraction) in order to avoid continually writing to FileOutputStream when 
writing many small FlowFiles
- Updated threading model of MinimalLockingWriteAheadLog - now performs 
serialization outside of lock and writes to a 'synchronized' OutputStream
- Change minimum scheduling period for components from 30 microseconds to 1 
nanosecond. ScheduledExecutor is very inconsistent with timing of task 
scheduling. With the bored.yield.duration
  now present, this value doesn't need to be set to 30 microseconds. This was 
originally done to avoid processors that had no work from dominating the CPU. 
However, now that we will yield
  when processors have no work, this results in slowing down processors that 
are able to perform work.
- Allow nifi.properties to specify multiple directories for FlowFile Repository
- If backpressure is engaged while running a batch of sessions, then stop batch 
processing earlier. This helps FlowFiles to move through the system much more 
smoothly instead of the
  herky-jerky queuing that we previously saw at very high rates of FlowFiles.
- Added NiFi PID to log message when starting nifi. This was simply an update 
to the log message that provides helpful information.

NIFI-3356: Fixed bug in ContentClaimWriteCache that resulted in data corruption 
and fixed bug in RepositoryConfiguration that threw exception if cache warm 
duration was set to empty string

NIFI-3356: Fixed NPE

NIFI-3356: Added debug-level performance monitoring

NIFI-3356: Updates to unit tests that failed after rebasing against master

NIFI-3356: Incorporated PR review feedback

NIFI-3356: Fixed bug where we would delete index directories that are still in 
use; also added additional debug logging and a simple util class that can be 
used to textualize provenance event files - useful in debugging

This closes #1493


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/96ed405d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/96ed405d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/96ed405d

Branch: refs/heads/master
Commit: 96ed405d708894ee5400ebbdbf335325219faa09
Parents: 8d467f3
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Dec 9 10:52:33 2016 -0500
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Wed Feb 22 12:40:06 2017 -0500

----------------------------------------------------------------------
 .../nifi/provenance/ProvenanceEventRecord.java  |  10 +
 .../java/org/apache/nifi/bootstrap/RunNiFi.java |  10 +-
 .../nifi/provenance/ProgressiveResult.java      |  34 +
 .../nifi/provenance/StandardLineageResult.java  |  47 +-
 .../StandardProvenanceEventRecord.java          |  14 +-
 .../nifi/provenance/StandardQueryResult.java    |  91 ++-
 .../nifi/provenance/lineage/EventNode.java      |   3 +-
 .../nifi/repository/schema/FieldMapRecord.java  |   5 +
 .../nifi/repository/schema/RecordSchema.java    |   2 +-
 .../repository/schema/SchemaRecordReader.java   |  10 +-
 .../repository/schema/SchemaRecordWriter.java   |   6 +-
 .../org/wali/MinimalLockingWriteAheadLog.java   | 165 ++---
 .../wali/TestMinimalLockingWriteAheadLog.java   |  77 +-
 .../src/main/asciidoc/administration-guide.adoc |  88 ++-
 .../org/apache/nifi/controller/Triggerable.java |   2 +-
 .../nifi/provenance/IdentifierLookup.java       |  88 +++
 .../nifi/provenance/ProvenanceRepository.java   |   7 +-
 .../lineage/ComputeLineageResult.java           |   3 +
 .../apache/nifi/provenance/search/Query.java    |   4 +
 .../nifi/provenance/search/QueryResult.java     |   3 +
 .../provenance/MockProvenanceRepository.java    |   2 +-
 .../java/org/apache/nifi/util/MockFlowFile.java |  24 +-
 .../apache/nifi/controller/AbstractPort.java    |   2 +-
 .../apache/nifi/controller/StandardFunnel.java  |   2 +-
 .../apache/nifi/controller/FlowController.java  |  40 +-
 .../repository/FileSystemRepository.java        |   1 +
 .../repository/StandardProcessSession.java      | 114 ++-
 .../WriteAheadFlowFileRepository.java           |  36 +-
 .../claim/ContentClaimWriteCache.java           | 166 +++++
 .../repository/schema/ContentClaimFieldMap.java |  33 +
 .../schema/RepositoryRecordUpdate.java          |   6 +-
 .../scheduling/EventDrivenSchedulingAgent.java  |   4 +-
 .../history/ProcessorStatusDescriptor.java      |  48 +-
 .../VolatileComponentStatusRepository.java      |   4 +-
 .../tasks/ContinuallyRunProcessorTask.java      |  16 +-
 .../repository/TestFileSystemRepository.java    |  39 +-
 .../repository/TestStandardProcessSession.java  |  11 +-
 .../TestWriteAheadFlowFileRepository.java       | 261 +++++++
 .../claim/TestContentClaimWriteCache.java       |  98 +++
 .../nifi-framework/nifi-resources/pom.xml       |   4 +-
 .../nifi/provenance/AbstractRecordWriter.java   |  22 +-
 .../provenance/ByteArraySchemaRecordWriter.java |  17 +-
 .../EventIdFirstSchemaRecordReader.java         | 145 ++++
 .../EventIdFirstSchemaRecordWriter.java         | 241 ++++++
 .../nifi/provenance/IndexConfiguration.java     |  12 +-
 .../PersistentProvenanceRepository.java         | 260 +++----
 .../provenance/RepositoryConfiguration.java     | 140 +++-
 .../nifi/provenance/StandardRecordReader.java   |   7 +-
 .../nifi/provenance/StandardRecordWriter.java   |  12 +-
 .../WriteAheadProvenanceRepository.java         | 280 +++++++
 .../authorization/EventAuthorizer.java          | 119 +++
 .../authorization/EventTransformer.java         |  42 ++
 .../authorization/UserEventAuthorizer.java      |  76 ++
 .../nifi/provenance/index/EventIndex.java       | 160 ++++
 .../provenance/index/EventIndexSearcher.java    |  29 +
 .../nifi/provenance/index/EventIndexWriter.java |  44 ++
 .../provenance/index/SearchFailedException.java |  28 +
 .../provenance/index/lucene/CachedQuery.java    |  33 +
 .../index/lucene/CommitPreference.java          |  24 +
 .../lucene/ConvertEventToLuceneDocument.java    | 143 ++++
 .../provenance/index/lucene/EventIndexTask.java | 244 ++++++
 .../index/lucene/IndexDirectoryManager.java     | 358 +++++++++
 .../provenance/index/lucene/IndexLocation.java  |  90 +++
 .../index/lucene/IndexableDocument.java         |  47 ++
 .../lucene/LatestEventsPerProcessorQuery.java   |  77 ++
 .../index/lucene/LatestEventsQuery.java         |  55 ++
 .../index/lucene/LuceneCacheWarmer.java         |  67 ++
 .../index/lucene/LuceneEventIndex.java          | 737 +++++++++++++++++++
 .../nifi/provenance/index/lucene/QueryTask.java | 208 ++++++
 .../provenance/index/lucene/StoredDocument.java |  39 +
 .../provenance/lucene/CachingIndexManager.java  | 176 +++--
 .../provenance/lucene/DeleteIndexAction.java    |  12 +-
 .../nifi/provenance/lucene/DocsReader.java      |  65 +-
 .../lucene/DocumentToEventConverter.java        |  30 +
 .../nifi/provenance/lucene/IndexManager.java    |  16 +-
 .../nifi/provenance/lucene/IndexSearch.java     |  38 +-
 .../nifi/provenance/lucene/IndexingAction.java  |  16 +-
 .../nifi/provenance/lucene/LineageQuery.java    |  23 +-
 .../lucene/LuceneEventIndexSearcher.java        |  92 +++
 .../lucene/LuceneEventIndexWriter.java          | 144 ++++
 .../provenance/lucene/SimpleIndexManager.java   | 336 ++++++---
 .../nifi/provenance/schema/EventFieldNames.java |  59 ++
 .../schema/EventIdFirstHeaderSchema.java        |  52 ++
 .../nifi/provenance/schema/EventRecord.java     | 108 +--
 .../provenance/schema/EventRecordFields.java    |  92 +--
 .../schema/LookupTableEventRecord.java          | 363 +++++++++
 .../schema/LookupTableEventRecordFields.java    |  89 +++
 .../schema/LookupTableEventSchema.java          |  94 +++
 .../schema/ProvenanceEventSchema.java           |  10 +-
 .../serialization/CompressableRecordReader.java | 107 ++-
 .../serialization/CompressableRecordWriter.java |  80 +-
 .../serialization/EmptyRecordReader.java        |  12 +
 .../serialization/EventFileCompressor.java      | 188 +++++
 .../provenance/serialization/RecordReader.java  |  23 +-
 .../provenance/serialization/RecordReaders.java |  19 +-
 .../provenance/serialization/RecordWriter.java  |  13 +-
 .../provenance/serialization/RecordWriters.java |  10 +-
 .../serialization/StorageSummary.java           |  72 ++
 .../nifi/provenance/store/EventFileManager.java | 109 +++
 .../nifi/provenance/store/EventStore.java       | 123 ++++
 .../provenance/store/EventStorePartition.java   | 113 +++
 .../provenance/store/PartitionedEventStore.java | 284 +++++++
 .../store/PartitionedWriteAheadEventStore.java  | 142 ++++
 .../provenance/store/RecordReaderFactory.java   |  29 +
 .../provenance/store/RecordWriterFactory.java   |  28 +
 .../provenance/store/RecordWriterLease.java     |  93 +++
 .../nifi/provenance/store/StorageResult.java    |  67 ++
 .../store/WriteAheadStorePartition.java         | 637 ++++++++++++++++
 .../iterator/AuthorizingEventIterator.java      |  63 ++
 .../store/iterator/EventIterator.java           |  56 ++
 .../SelectiveRecordReaderEventIterator.java     | 174 +++++
 .../SequentialRecordReaderEventIterator.java    | 115 +++
 .../nifi/provenance/toc/StandardTocReader.java  |  68 +-
 .../apache/nifi/provenance/toc/TocReader.java   |  18 +-
 .../org/apache/nifi/provenance/toc/TocUtil.java |   2 +-
 .../nifi/provenance/util/CloseableUtil.java     |  45 ++
 .../nifi/provenance/util/DirectoryUtils.java    |  96 +++
 .../nifi/provenance/util/DumpEventFile.java     |  79 ++
 .../provenance/util/NamedThreadFactory.java     |  47 ++
 .../provenance/util/StorageSummaryEvent.java    | 185 +++++
 ....apache.nifi.provenance.ProvenanceRepository |   3 +-
 .../AbstractTestRecordReaderWriter.java         |  80 +-
 ...estEventIdFirstSchemaRecordReaderWriter.java | 477 ++++++++++++
 .../TestPersistentProvenanceRepository.java     | 193 ++---
 .../TestSchemaRecordReaderWriter.java           | 105 ++-
 .../TestStandardRecordReaderWriter.java         |  29 +-
 .../org/apache/nifi/provenance/TestUtil.java    |  19 +
 .../index/lucene/TestEventIndexTask.java        | 142 ++++
 .../index/lucene/TestIndexDirectoryManager.java | 100 +++
 .../index/lucene/TestLuceneEventIndex.java      | 538 ++++++++++++++
 .../lucene/TestCachingIndexManager.java         |  28 +-
 .../lucene/TestSimpleIndexManager.java          | 104 ++-
 .../provenance/store/ArrayListEventStore.java   | 155 ++++
 .../provenance/store/TestEventFileManager.java  | 240 ++++++
 .../TestPartitionedWriteAheadEventStore.java    | 468 ++++++++++++
 .../store/TestWriteAheadStorePartition.java     | 111 +++
 .../TestSelectiveRecordReaderEventIterator.java | 146 ++++
 .../VolatileProvenanceRepository.java           |   9 +-
 138 files changed, 12165 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java 
b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java
index 7ba1622..b05bd85 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java
@@ -80,6 +80,16 @@ public interface ProvenanceEventRecord {
     Map<String, String> getAttributes();
 
     /**
+     * Returns the attribute with the given name
+     *
+     * @param attributeName the name of the attribute to get
+     * @return the attribute with the given name or <code>null</code> if no 
attribute exists with the given name
+     */
+    default String getAttribute(String attributeName) {
+        return getAttributes().get(attributeName);
+    }
+
+    /**
      * @return all FlowFile attributes that existed on the FlowFile before this
      * event occurred
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index 8d92c44..10d5cde 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -1067,11 +1067,14 @@ public class RunNiFi {
         Process process = builder.start();
         handleLogging(process);
         Long pid = getPid(process, cmdLogger);
-        if (pid != null) {
+        if (pid == null) {
+            cmdLogger.info("Launched Apache NiFi but could not determined the 
Process ID");
+        } else {
             nifiPid = pid;
             final Properties pidProperties = new Properties();
             pidProperties.setProperty(PID_KEY, String.valueOf(nifiPid));
             savePidProperties(pidProperties, cmdLogger);
+            cmdLogger.info("Launched Apache NiFi with Process ID " + pid);
         }
 
         shutdownHook = new ShutdownHook(process, this, secretKey, 
gracefulShutdownSeconds, loggingExecutor);
@@ -1129,11 +1132,14 @@ public class RunNiFi {
                     handleLogging(process);
 
                     pid = getPid(process, defaultLogger);
-                    if (pid != null) {
+                    if (pid == null) {
+                        cmdLogger.info("Launched Apache NiFi but could not 
obtain the Process ID");
+                    } else {
                         nifiPid = pid;
                         final Properties pidProperties = new Properties();
                         pidProperties.setProperty(PID_KEY, 
String.valueOf(nifiPid));
                         savePidProperties(pidProperties, defaultLogger);
+                        cmdLogger.info("Launched Apache NiFi with Process ID " 
+ pid);
                     }
 
                     shutdownHook = new ShutdownHook(process, this, secretKey, 
gracefulShutdownSeconds, loggingExecutor);

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/ProgressiveResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/ProgressiveResult.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/ProgressiveResult.java
new file mode 100644
index 0000000..e2dd33b
--- /dev/null
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/ProgressiveResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.util.Collection;
+
+/**
+ * A Provenance query result that is capable of being updated
+ */
+public interface ProgressiveResult {
+
+    void update(Collection<ProvenanceEventRecord> records, long totalHitCount);
+
+    void setError(String error);
+
+    long getTotalHitCount();
+
+    boolean isFinished();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
index 78b3188..2b7b34a 100644
--- 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
@@ -44,7 +44,7 @@ import org.apache.nifi.provenance.lineage.LineageNode;
 /**
  *
  */
-public class StandardLineageResult implements ComputeLineageResult {
+public class StandardLineageResult implements ComputeLineageResult, 
ProgressiveResult {
 
     public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, 
TimeUnit.MINUTES);
     private static final Logger logger = 
LoggerFactory.getLogger(StandardLineageResult.class);
@@ -66,6 +66,7 @@ public class StandardLineageResult implements 
ComputeLineageResult {
     private int numCompletedSteps = 0;
 
     private volatile boolean canceled = false;
+    private final Object completionMonitor = new Object();
 
     public StandardLineageResult(final int numSteps, final Collection<String> 
flowFileUuids) {
         this.numSteps = numSteps;
@@ -162,6 +163,7 @@ public class StandardLineageResult implements 
ComputeLineageResult {
         }
     }
 
+    @Override
     public void setError(final String error) {
         writeLock.lock();
         try {
@@ -178,7 +180,10 @@ public class StandardLineageResult implements 
ComputeLineageResult {
         }
     }
 
-    public void update(final Collection<ProvenanceEventRecord> records) {
+    @Override
+    public void update(final Collection<ProvenanceEventRecord> records, final 
long totalHitCount) {
+        boolean computationComplete = false;
+
         writeLock.lock();
         try {
             relevantRecords.addAll(records);
@@ -187,12 +192,22 @@ public class StandardLineageResult implements 
ComputeLineageResult {
             updateExpiration();
 
             if (numCompletedSteps >= numSteps && error == null) {
+                computationComplete = true;
                 computeLineage();
                 computationNanos = System.nanoTime() - creationNanos;
             }
         } finally {
             writeLock.unlock();
         }
+
+        if (computationComplete) {
+            final long computationMillis = 
TimeUnit.NANOSECONDS.toMillis(computationNanos);
+            logger.info("Completed computation of lineage for FlowFile UUID(s) 
{} comprised of {} steps in {} millis", flowFileUuids, numSteps, 
computationMillis);
+
+            synchronized (completionMonitor) {
+                completionMonitor.notifyAll();
+            }
+        }
     }
 
     /**
@@ -201,6 +216,7 @@ public class StandardLineageResult implements 
ComputeLineageResult {
      * useful after all of the records have been successfully obtained
      */
     private void computeLineage() {
+        logger.debug("Computing lineage with the following events: {}", 
relevantRecords);
         final long startNanos = System.nanoTime();
 
         nodes.clear();
@@ -324,4 +340,31 @@ public class StandardLineageResult implements 
ComputeLineageResult {
     private void updateExpiration() {
         expirationDate = new Date(System.currentTimeMillis() + TTL);
     }
+
+    @Override
+    public boolean awaitCompletion(final long time, final TimeUnit unit) 
throws InterruptedException {
+        final long finishTime = System.currentTimeMillis() + 
unit.toMillis(time);
+        synchronized (completionMonitor) {
+            while (!isFinished()) {
+                final long millisToWait = finishTime - 
System.currentTimeMillis();
+                if (millisToWait > 0) {
+                    completionMonitor.wait(millisToWait);
+                } else {
+                    return isFinished();
+                }
+            }
+        }
+
+        return isFinished();
+    }
+
+    @Override
+    public long getTotalHitCount() {
+        readLock.lock();
+        try {
+            return relevantRecords.size();
+        } finally {
+            readLock.unlock();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
index f8f4055..ac60d4f 100644
--- 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
@@ -67,7 +67,7 @@ public final class StandardProvenanceEventRecord implements 
ProvenanceEventRecor
     private final Map<String, String> previousAttributes;
     private final Map<String, String> updatedAttributes;
 
-    private volatile long eventId;
+    private volatile long eventId = -1L;
 
     private StandardProvenanceEventRecord(final Builder builder) {
         this.eventTime = builder.eventTime;
@@ -369,14 +369,22 @@ public final class StandardProvenanceEventRecord 
implements ProvenanceEventRecor
             return false;
         }
 
-        if (a == null && b != null) {
+        if (a == null && b != null && !b.isEmpty()) {
             return true;
         }
 
-        if (a != null && b == null) {
+        if (a == null && b.isEmpty()) {
+            return false;
+        }
+
+        if (a != null && !a.isEmpty() && b == null) {
             return true;
         }
 
+        if (a.isEmpty() && b == null) {
+            return false;
+        }
+
         if (a.size() != b.size()) {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
index 5c09e8e..2777339 100644
--- 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
@@ -22,7 +22,7 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -31,8 +31,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class StandardQueryResult implements QueryResult {
+public class StandardQueryResult implements QueryResult, ProgressiveResult {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardQueryResult.class);
 
     public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, 
TimeUnit.MINUTES);
     private final Query query;
@@ -44,12 +47,12 @@ public class StandardQueryResult implements QueryResult {
 
     private final Lock writeLock = rwLock.writeLock();
     // guarded by writeLock
-    private final Set<ProvenanceEventRecord> matchingRecords = new 
TreeSet<>(new EventIdComparator());
-    private long totalHitCount;
+    private final SortedSet<ProvenanceEventRecord> matchingRecords = new 
TreeSet<>(new EventIdComparator());
     private int numCompletedSteps = 0;
     private Date expirationDate;
     private String error;
     private long queryTime;
+    private final Object completionMonitor = new Object();
 
     private volatile boolean canceled = false;
 
@@ -65,22 +68,7 @@ public class StandardQueryResult implements QueryResult {
     public List<ProvenanceEventRecord> getMatchingEvents() {
         readLock.lock();
         try {
-            if (matchingRecords.size() <= query.getMaxResults()) {
-                return new ArrayList<>(matchingRecords);
-            }
-
-            final List<ProvenanceEventRecord> copy = new 
ArrayList<>(query.getMaxResults());
-
-            int i = 0;
-            final Iterator<ProvenanceEventRecord> itr = 
matchingRecords.iterator();
-            while (itr.hasNext()) {
-                copy.add(itr.next());
-                if (++i >= query.getMaxResults()) {
-                    break;
-                }
-            }
-
-            return copy;
+            return new ArrayList<>(matchingRecords);
         } finally {
             readLock.unlock();
         }
@@ -137,7 +125,7 @@ public class StandardQueryResult implements QueryResult {
     public boolean isFinished() {
         readLock.lock();
         try {
-            return numCompletedSteps >= numSteps || canceled;
+            return numCompletedSteps >= numSteps || canceled || 
matchingRecords.size() >= query.getMaxResults();
         } finally {
             readLock.unlock();
         }
@@ -147,6 +135,7 @@ public class StandardQueryResult implements QueryResult {
         this.canceled = true;
     }
 
+    @Override
     public void setError(final String error) {
         writeLock.lock();
         try {
@@ -163,22 +152,74 @@ public class StandardQueryResult implements QueryResult {
         }
     }
 
-    public void update(final Collection<ProvenanceEventRecord> 
matchingRecords, final long totalHits) {
+    @Override
+    public void update(final Collection<ProvenanceEventRecord> newEvents, 
final long totalHits) {
+        boolean queryComplete = false;
+
         writeLock.lock();
         try {
-            this.matchingRecords.addAll(matchingRecords);
-            this.totalHitCount += totalHits;
+            if (isFinished()) {
+                return;
+            }
+
+            this.matchingRecords.addAll(newEvents);
+
+            // If we've added more records than the query's max, then remove 
the trailing elements.
+            // We do this, rather than avoiding the addition of the elements 
because we want to choose
+            // the events with the largest ID.
+            if (matchingRecords.size() > query.getMaxResults()) {
+                final Iterator<ProvenanceEventRecord> itr = 
matchingRecords.iterator();
+                for (int i = 0; i < query.getMaxResults(); i++) {
+                    itr.next();
+                }
+
+                while (itr.hasNext()) {
+                    itr.next();
+                    itr.remove();
+                }
+            }
 
             numCompletedSteps++;
             updateExpiration();
 
-            if (numCompletedSteps >= numSteps) {
+            if (numCompletedSteps >= numSteps || this.matchingRecords.size() 
>= query.getMaxResults()) {
                 final long searchNanos = System.nanoTime() - creationNanos;
                 queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, 
TimeUnit.NANOSECONDS);
+                queryComplete = true;
+
+                if (numCompletedSteps >= numSteps) {
+                    logger.info("Completed {} comprised of {} steps in {} 
millis", query, numSteps, queryTime);
+                } else {
+                    logger.info("Completed {} comprised of {} steps in {} 
millis (only completed {} steps because the maximum number of results was 
reached)",
+                        query, numSteps, queryTime, numCompletedSteps);
+                }
             }
         } finally {
             writeLock.unlock();
         }
+
+        if (queryComplete) {
+            synchronized (completionMonitor) {
+                completionMonitor.notifyAll();
+            }
+        }
+    }
+
+    @Override
+    public boolean awaitCompletion(final long time, final TimeUnit unit) 
throws InterruptedException {
+        final long finishTime = System.currentTimeMillis() + 
unit.toMillis(time);
+        synchronized (completionMonitor) {
+            while (!isFinished()) {
+                final long millisToWait = finishTime - 
System.currentTimeMillis();
+                if (millisToWait > 0) {
+                    completionMonitor.wait(millisToWait);
+                } else {
+                    return isFinished();
+                }
+            }
+        }
+
+        return isFinished();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
index de516cb..4906ea3 100644
--- 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
@@ -18,7 +18,6 @@ package org.apache.nifi.provenance.lineage;
 
 import java.util.List;
 
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 
@@ -58,7 +57,7 @@ public class EventNode implements ProvenanceEventLineageNode {
 
     @Override
     public String getFlowFileUuid() {
-        return record.getAttributes().get(CoreAttributes.UUID.key());
+        return record.getFlowFileUuid();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java
 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java
index c9368a7..acebcb9 100644
--- 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java
+++ 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldMapRecord.java
@@ -29,6 +29,11 @@ public class FieldMapRecord implements Record {
         this.values = convertFieldToName(values);
     }
 
+    public FieldMapRecord(final RecordSchema schema, final Map<String, Object> 
values) {
+        this.schema = schema;
+        this.values = new HashMap<>(values);
+    }
+
     private static Map<String, Object> convertFieldToName(final 
Map<RecordField, Object> map) {
         final Map<String, Object> nameMap = new HashMap<>(map.size());
         for (final Map.Entry<RecordField, Object> entry : map.entrySet()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java
 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java
index 965254b..fe18765 100644
--- 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java
+++ 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java
@@ -120,8 +120,8 @@ public class RecordSchema {
 
     @SuppressWarnings("unchecked")
     private static RecordField readField(final DataInputStream dis) throws 
IOException {
-        final Map<String, Object> schemaFieldMap = new HashMap<>();
         final int numElementsToRead = dis.readInt();
+        final Map<String, Object> schemaFieldMap = new 
HashMap<>(numElementsToRead);
         for (int i = 0; i < numElementsToRead; i++) {
             final String fieldName = dis.readUTF();
             final String typeName = dis.readUTF();

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
index b296b13..84f3532 100644
--- 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
+++ 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
@@ -109,7 +109,15 @@ public class SchemaRecordReader {
             }
         }
 
-        return readFieldValue(in, field.getFieldType(), field.getFieldName(), 
field.getSubFields());
+        try {
+            return readFieldValue(in, field.getFieldType(), 
field.getFieldName(), field.getSubFields());
+        } catch (final EOFException eof) {
+            final EOFException exception = new EOFException("Failed to read 
field '" + field.getFieldName() + "'");
+            exception.addSuppressed(eof);
+            throw exception;
+        } catch (final IOException ioe) {
+            throw new IOException("Failed to read field '" + 
field.getFieldName() + "'", ioe);
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
index 3e4a059..5305e5b 100644
--- 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
+++ 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
@@ -44,8 +44,12 @@ public class SchemaRecordWriter {
     }
 
     private void writeRecordFields(final Record record, final OutputStream 
out) throws IOException {
+        writeRecordFields(record, record.getSchema(), out);
+    }
+
+    private void writeRecordFields(final Record record, final RecordSchema 
schema, final OutputStream out) throws IOException {
         final DataOutputStream dos = out instanceof DataOutputStream ? 
(DataOutputStream) out : new DataOutputStream(out);
-        for (final RecordField field : record.getSchema().getFields()) {
+        for (final RecordField field : schema.getFields()) {
             final Object value = record.getFieldValue(field);
 
             try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 12e2b10..1a9e219 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -18,6 +18,9 @@ package org.wali;
 
 import static java.util.Objects.requireNonNull;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -55,12 +58,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -230,7 +230,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
                         final long transactionId = 
transactionIdGenerator.getAndIncrement();
                         if (logger.isTraceEnabled()) {
                             for (final T record : records) {
-                                logger.trace("Partition {} performing 
Transaction {}: {}", new Object[]{partition, transactionId, record});
+                                logger.trace("Partition {} performing 
Transaction {}: {}", new Object[] {partition, transactionId, record});
                             }
                         }
 
@@ -670,11 +670,10 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         private final Path editDirectory;
         private final int writeAheadLogVersion;
 
-        private final Lock lock = new ReentrantLock();
         private DataOutputStream dataOut = null;
         private FileOutputStream fileOut = null;
-        private boolean blackListed = false;
-        private boolean closed = false;
+        private volatile boolean blackListed = false;
+        private volatile boolean closed = false;
         private DataInputStream recoveryIn;
         private int recoveryVersion;
         private String currentJournalFilename = "";
@@ -707,26 +706,15 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         }
 
         public boolean tryClaim() {
-            final boolean obtainedLock = lock.tryLock();
-            if (!obtainedLock) {
-                return false;
-            }
-
-            // Check if the partition is blacklisted. If so, unlock it and 
return false. Otherwise,
-            // leave it locked and return true, so that the caller will need 
to unlock.
-            if (blackListed) {
-                lock.unlock();
-                return false;
-            }
-
-            return true;
+            return !blackListed;
         }
 
         public void releaseClaim() {
-            lock.unlock();
         }
 
         public void close() {
+            this.closed = true;
+
             // Note that here we are closing fileOut and NOT dataOut.
             // This is very much intentional, not an oversight. This is done 
because of
             // the way that the OutputStreams are structured. dataOut wraps a 
BufferedOutputStream,
@@ -761,18 +749,12 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
                 }
             }
 
-            this.closed = true;
             this.dataOut = null;
             this.fileOut = null;
         }
 
         public void blackList() {
-            lock.lock();
-            try {
-                blackListed = true;
-            } finally {
-                lock.unlock();
-            }
+            blackListed = true;
             logger.debug("Blacklisted {}", this);
         }
 
@@ -783,55 +765,50 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
          * @throws IOException if failure to rollover
          */
         public OutputStream rollover() throws IOException {
-            lock.lock();
-            try {
-                // Note that here we are closing fileOut and NOT dataOut. See 
the note in the close()
-                // method to understand the logic behind this.
-                final OutputStream oldOutputStream = fileOut;
-                dataOut = null;
-                fileOut = null;
+            // Note that here we are closing fileOut and NOT dataOut. See the 
note in the close()
+            // method to understand the logic behind this.
+            final OutputStream oldOutputStream = fileOut;
+            dataOut = null;
+            fileOut = null;
 
-                this.serde = serdeFactory.createSerDe(null);
-                final Path editPath = getNewEditPath();
-                final FileOutputStream fos = new 
FileOutputStream(editPath.toFile());
+            this.serde = serdeFactory.createSerDe(null);
+            final Path editPath = getNewEditPath();
+            final FileOutputStream fos = new 
FileOutputStream(editPath.toFile());
+            try {
+                final DataOutputStream outStream = new DataOutputStream(new 
BufferedOutputStream(fos));
+                
outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
+                outStream.writeInt(writeAheadLogVersion);
+                outStream.writeUTF(serde.getClass().getName());
+                outStream.writeInt(serde.getVersion());
+                serde.writeHeader(outStream);
+
+                outStream.flush();
+                dataOut = outStream;
+                fileOut = fos;
+            } catch (final IOException ioe) {
                 try {
-                    final DataOutputStream outStream = new 
DataOutputStream(new BufferedOutputStream(fos));
-                    
outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
-                    outStream.writeInt(writeAheadLogVersion);
-                    outStream.writeUTF(serde.getClass().getName());
-                    outStream.writeInt(serde.getVersion());
-                    serde.writeHeader(outStream);
-
-                    outStream.flush();
-                    dataOut = outStream;
-                    fileOut = fos;
-                } catch (final IOException ioe) {
-                    try {
-                        oldOutputStream.close();
-                    } catch (final IOException ioe2) {
-                        ioe.addSuppressed(ioe2);
-                    }
-
-                    logger.error("Failed to create new journal for {} due to 
{}", new Object[] {this, ioe.toString()}, ioe);
-                    try {
-                        fos.close();
-                    } catch (final IOException innerIOE) {
-                    }
-
-                    dataOut = null;
-                    fileOut = null;
-                    blackList();
+                    oldOutputStream.close();
+                } catch (final IOException ioe2) {
+                    ioe.addSuppressed(ioe2);
+                }
 
-                    throw ioe;
+                logger.error("Failed to create new journal for {} due to {}", 
new Object[] {this, ioe.toString()}, ioe);
+                try {
+                    fos.close();
+                } catch (final IOException innerIOE) {
                 }
 
-                currentJournalFilename = editPath.toFile().getName();
+                dataOut = null;
+                fileOut = null;
+                blackList();
 
-                blackListed = false;
-                return oldOutputStream;
-            } finally {
-                lock.unlock();
+                throw ioe;
             }
+
+            currentJournalFilename = editPath.toFile().getName();
+
+            blackListed = false;
+            return oldOutputStream;
         }
 
         private long getJournalIndex(final File file) {
@@ -939,33 +916,39 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             return true;
         }
 
-        public void update(final Collection<S> records, final long 
transactionId, final Map<Object, S> recordMap, final boolean forceSync)
-                throws IOException {
-            if (this.closed) {
-                throw new IllegalStateException("Partition is closed");
-            }
+        public void update(final Collection<S> records, final long 
transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws 
IOException {
+            try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream(256);
+                final DataOutputStream out = new DataOutputStream(baos)) {
 
-            final DataOutputStream out = dataOut;
-            out.writeLong(transactionId);
+                out.writeLong(transactionId);
+                final int numEditsToSerialize = records.size();
+                int editsSerialized = 0;
+                for (final S record : records) {
+                    final Object recordId = serde.getRecordIdentifier(record);
+                    final S previousVersion = recordMap.get(recordId);
 
-            final int numEditsToSerialize = records.size();
-            int editsSerialized = 0;
-            for (final S record : records) {
-                final Object recordId = serde.getRecordIdentifier(record);
-                final S previousVersion = recordMap.get(recordId);
+                    serde.serializeEdit(previousVersion, record, out);
+                    if (++editsSerialized < numEditsToSerialize) {
+                        out.write(TRANSACTION_CONTINUE);
+                    } else {
+                        out.write(TRANSACTION_COMMIT);
+                    }
+                }
 
-                serde.serializeEdit(previousVersion, record, out);
-                if (++editsSerialized < numEditsToSerialize) {
-                    out.write(TRANSACTION_CONTINUE);
-                } else {
-                    out.write(TRANSACTION_COMMIT);
+                out.flush();
+
+                if (this.closed) {
+                    throw new IllegalStateException("Partition is closed");
                 }
-            }
 
-            out.flush();
+                baos.writeTo(dataOut);
+                dataOut.flush();
 
-            if (forceSync) {
-                fileOut.getFD().sync();
+                if (forceSync) {
+                    synchronized (fileOut) {
+                        fileOut.getFD().sync();
+                    }
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index 7b7d2ca..5cdad82 100644
--- 
a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ 
b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -54,6 +54,64 @@ public class TestMinimalLockingWriteAheadLog {
 
 
     @Test
+    @Ignore("for local testing only")
+    public void testUpdatePerformance() throws IOException, 
InterruptedException {
+        final int numPartitions = 4;
+
+        final Path path = Paths.get("target/minimal-locking-repo");
+        deleteRecursively(path.toFile());
+        assertTrue(path.toFile().mkdirs());
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        final int updateCountPerThread = 1_000_000;
+        final int numThreads = 16;
+
+        final Thread[] threads = new Thread[numThreads];
+
+        for (int j = 0; j < 2; j++) {
+            for (int i = 0; i < numThreads; i++) {
+                final Thread t = new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        for (int i = 0; i < updateCountPerThread; i++) {
+                            final DummyRecord record = new 
DummyRecord(String.valueOf(i), UpdateType.CREATE);
+                            try {
+                                repo.update(Collections.singleton(record), 
false);
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                                Assert.fail(e.toString());
+                            }
+                        }
+                    }
+                });
+
+                threads[i] = t;
+            }
+
+            final long start = System.nanoTime();
+            for (final Thread t : threads) {
+                t.start();
+            }
+            for (final Thread t : threads) {
+                t.join();
+            }
+
+            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            if (j == 0) {
+                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numPartitions + " 
partitions and " + numThreads + " threads, *as a warmup!*");
+            } else {
+                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numPartitions + " 
partitions and " + numThreads + " threads");
+            }
+        }
+    }
+
+
+
+    @Test
     public void testRepoDoesntContinuallyGrowOnOutOfMemoryError() throws 
IOException, InterruptedException {
         final int numPartitions = 8;
 
@@ -557,21 +615,10 @@ public class TestMinimalLockingWriteAheadLog {
                 assertEquals(2, transactionIndicator);
             }
 
-            long transactionId = in.readLong();
-            assertEquals(2L, transactionId);
-
-            long thirdSize = in.readLong();
-            assertEquals(8194, thirdSize);
-
-            // should be 8176 A's because we threw an Exception after writing 
8194 of them,
-            // but the BufferedOutputStream's buffer already had 8 bytes on it 
for the
-            // transaction id and the size.
-            for (int i = 0; i < 8176; i++) {
-                final int c = in.read();
-                assertEquals("i = " + i, 'A', c);
-            }
-
-            // Stream should now be out of data, because we threw an Exception!
+            // In previous implementations, we would still have a partial 
record written out.
+            // In the current version, however, the serde above would result 
in the data serialization
+            // failing and as a result no data would be written to the stream, 
so the stream should
+            // now be out of data
             final int nextByte = in.read();
             assertEquals(-1, nextByte);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index f7f9920..fff0bdd 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -2047,8 +2047,8 @@ FlowFile Repository, if also on that disk, could become 
corrupt. To avoid this s
  +
 For example, to provide two additional locations to act as part of the content 
repository, a user could also specify additional properties with keys of: +
  +
-nifi.provenance.repository.directory.content1=/repos/provenance1 +
-nifi.provenance.repository.directory.content2=/repos/provenance2 +
+nifi.content.repository.directory.content1=/repos/content1 +
+nifi.content.repository.directory.content2=/repos/content2 +
  +
 Providing three total locations, including  
_nifi.content.repository.directory.default_.
 |nifi.content.repository.archive.max.retention.period|If archiving is enabled 
(see nifi.content.repository.archive.enabled below), then
@@ -2073,7 +2073,25 @@ The Provenance Repository contains the information 
related to Data Provenance. T
 
 |====
 |*Property*|*Description*
-|nifi.provenance.repository.implementation|The Provenance Repository 
implementation. The default value is 
org.apache.nifi.provenance.PersistentProvenanceRepository and should only be 
changed with caution. To store provenance events in memory instead of on disk 
(at the risk of data loss in the event of power/machine failure), set this 
property to org.apache.nifi.provenance.VolatileProvenanceRepository.
+|nifi.provenance.repository.implementation|The Provenance Repository 
implementation. The default value is 
org.apache.nifi.provenance.PersistentProvenanceRepository.
+Two additional repositories are available as well.
+To store provenance events in memory instead of on disk (in which case all 
events will be lost on restart, and events will be evicted in a 
first-in-first-out order),
+set this property to org.apache.nifi.provenance.VolatileProvenanceRepository. 
This leaves a configurable number of Provenance Events in the Java heap, so the 
number
+of events that can be retained is very limited.
+
+As of Apache NiFi 1.2.0, a third option is available: 
org.apache.nifi.provenance.WriteAheadProvenanceRepository.
+This implementation was created to replace the PersistentProvenanceRepository. 
The PersistentProvenanceRepository was originally written with the simple goal 
of persisting
+Provenance Events as they are generated and providing the ability to iterate 
over those events sequentially. Later, it was desired to be able to compress 
the data so that
+more data could be stored. After that, the ability to index and query the data 
was added. As requirements evolved over time, the repository kept changing 
without any major
+redesigns. When used in a NiFi instance that is responsible for processing 
large volumes of small FlowFiles, the PersistentProvenanceRepository can 
quickly become a bottleneck.
+The WriteAheadProvenanceRepository was then written to provide the same 
capabilities as the PersistentProvenanceRepository while providing far better 
performance.
+Changing to the WriteAheadProvenanceRepository is easy to accomplish, as the 
two repositories support most of the same properties.
+*Note Well*, however, the follow caveat: The WriteAheadProvenanceRepository 
will make use of the Provenance data stored by the 
PersistentProvenanceRepository. However, the
+PersistentProvenanceRepository may not be able to read the data written by the 
WriteAheadProvenanceRepository. Therefore, once the Provenance Repository is 
changed to use
+the WriteAheadProvenanceRepository, it cannot be changed back to the 
PersistentProvenanceRepository without deleting the data in the Provenance 
Repository. It is therefore
+recommended that before changing the implementation, users ensure that their 
version of NiFi is stable, in case any issue arises that causes the user to 
need to roll back to
+a previous version of NiFi that did not support the 
WriteAheadProvenanceRepository. It is for this reason that the default is still 
set to the PersistentProvenanceRepository
+at this time.
 |====
 
 === Persistent Provenance Repository Properties
@@ -2115,6 +2133,70 @@ Providing three total locations, including  
_nifi.provenance.repository.director
 |nifi.provenance.repository.buffer.size|The Provenance Repository buffer size. 
The default value is 100000.
 |====
 
+=== Write Ahead Provenance Repository Properties
+
+|====
+|*Property*|*Description*
+|nifi.provenance.repository.directory.default*|The location of the Provenance 
Repository. The default value is ./provenance_repository. +
+ +
+       *NOTE*: Multiple provenance repositories can be specified by using the 
*_nifi.provenance.repository.directory._* prefix with unique suffixes and 
separate paths as values. +
+ +
+       For example, to provide two additional locations to act as part of the 
provenance repository, a user could also specify additional properties with 
keys of: +
+ +
+       nifi.provenance.repository.directory.provenance1=/repos/provenance1 +
+       nifi.provenance.repository.directory.provenance2=/repos/provenance2 +
+ +
+       Providing three total locations, including  
_nifi.provenance.repository.directory.default_.
+|nifi.provenance.repository.max.storage.time|The maximum amount of time to 
keep data provenance information. The default value is 24 hours.
+|nifi.provenance.repository.max.storage.size|The maximum amount of data 
provenance information to store at a time.
+       The default is 1 GB. The Data Provenance capability can consume a great 
deal of storage space because so much data is kept.
+       For production environments, values of 1-2 TB or more is not uncommon. 
The repository will write to a single "event file" (or set of
+       "event files" if multiple storage locations are defined, as described 
above) for some period of time (defined by the
+       nifi.provenance.repository.rollover.time and  
nifi.provenance.repository.rollover.size properties). Data is always aged off 
one file at a time,
+       so it is not advisable to write to a single "event file" for a 
tremendous amount of time, as it will prevent old data from aging off as 
smoothly.
+|nifi.provenance.repository.rollover.time|The amount of time to wait before 
rolling over the "event file" that the repository is writing to.
+|nifi.provenance.repository.rollover.size|The amount of data to write to a 
single "event file." The default value is 100 MB. For production
+       environments where a very large amount of Data Provenance is generated, 
a value of 1 GB is also very reasonable. 
+|nifi.provenance.repository.query.threads|The number of threads to use for 
Provenance Repository queries. The default value is 2.
+|nifi.provenance.repository.index.threads|The number of threads to use for 
indexing Provenance events so that they are searchable. The default value is 1.
+       For flows that operate on a very high number of FlowFiles, the indexing 
of Provenance events could become a bottleneck. If this happens, increasing the
+       value of this property may increase the rate at which the Provenance 
Repository is able to process these records, resulting in better overall 
throughput.
+       It is advisable to use at least 1 thread per storage location (i.e., if 
there are 3 storage locations, at least 3 threads should be used). For high
+       throughput environments, where more CPU and disk I/O is available, it 
may make sense to increase this value significantly. Typically going beyond
+       2-4 threads per storage location is not valuable. However, this can be 
tuned depending on the CPU resources available compared to the I/O resources. 
+|nifi.provenance.repository.compress.on.rollover|Indicates whether to compress 
the provenance information when an "event file" is rolled over. The default 
value is _true_.
+|nifi.provenance.repository.always.sync|If set to _true_, any change to the 
repository will be synchronized to the disk, meaning that NiFi will ask the 
operating system
+       not to cache the information. This is very expensive and can 
significantly reduce NiFi performance. However, if it is _false_, there could 
be the potential for data
+       loss if either there is a sudden power loss or the operating system 
crashes. The default value is _false_.
+|nifi.provenance.repository.indexed.fields|This is a comma-separated list of 
the fields that should be indexed and made searchable.
+       Fields that are not indexed will not be searchable. Valid fields are: 
EventType, FlowFileUUID, Filename, TransitURI, ProcessorID,
+       AlternateIdentifierURI, Relationship, Details. The default value is: 
EventType, FlowFileUUID, Filename, ProcessorID.
+|nifi.provenance.repository.indexed.attributes|This is a comma-separated list 
of FlowFile Attributes that should be indexed and made searchable. It is blank 
by default.
+       But some good examples to consider are 'filename' and 'mime.type' as 
well as any custom attritubes you might use which are valuable for your use 
case.
+|nifi.provenance.repository.index.shard.size|The repository uses Apache Lucene 
to performing indexing and searching capabilities. This value indicates how 
large a Lucene Index should
+       become before the Repository starts writing to a new Index. Large 
values for the shard size will result in more Java heap usage when searching 
the Provenance Repository but should
+       provide better performance. The default value is 500 MB. However, this 
is due to the fact that defaults are tuned for very small environments where 
most users begin to use NiFi.
+       For production environments, it is advisable to change this value to *4 
to 8 GB*. Once all Provenance Events in the index have been aged off from the 
"event files," the index
+       will be destroyed as well.
+|nifi.provenance.repository.max.attribute.length|Indicates the maximum length 
that a FlowFile attribute can be when retrieving a Provenance Event from the 
repository.
+       If the length of any attribute exceeds this value, it will be truncated 
when the event is retrieved. The default is 65536.
+|nifi.provenance.repository.concurrent.merge.threads|Apache Lucene creates 
several "segments" in an Index. These segments are periodically merged together 
in order to provide faster
+       querying. This property specifies the maximum number of threads that 
are allowed to be used for *each* of the storage directories. The default value 
is 2. For high throughput
+       environments, it is advisable to set the number of index threads larger 
than the number of merge threads * the number of storage locations. For 
example, if there are 2 storage
+       locations and the number of index threads is set to 8, then the number 
of merge threads should likely be less than 4. While it is not critical that 
this be done, setting the
+       number of merge threads larger than this can result in all index 
threads being used to merge, which would cause the NiFi flow to periodically 
pause while indexing is happening,
+       resulting in some data being processed with much higher latency than 
other data.
+|nifi.provenance.repository.warm.cache.frequency|Each time that a Provenance 
query is run, the query must first search the Apache Lucene indices (at least, 
in most cases - there are
+       some queries that are run often and the results are cached to avoid 
searching the Lucene indices). When a Lucene index is opened for the first 
time, it can be very expensive and take
+       several seconds. This is compounded by having many different indices, 
and can result in a Provenance query taking much longer. After the index has 
been opened, the Operating System's
+       disk cache will typically hold onto enough data to make re-opening the 
index much faster - at least for a period of time, until the disk cache evicts 
this data. If this value is set,
+       NiFi will periodically open each Lucene index and then close it, in 
order to "warm" the cache. This will result in far faster queries when the 
Provenance Repository is large. As with
+       all great things, though, it comes with a cost. Warming the cache does 
take some CPU resources, but more importantly it will evict other data from the 
Operating System disk cache and
+       will result in reading (potentially a great deal of) data from the 
disk. This can result in lower NiFi performance. However, if NiFi is running in 
an environment where CPU and disk
+       are not fully utilized, this feature can result in far faster 
Provenance queries. 
+|====
+
+
 === Component Status Repository
 
 The Component Status Repository contains the information for the Component 
Status History tool in the User Interface. These

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
index 4b3149b..5255c05 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
@@ -25,7 +25,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 
 public interface Triggerable {
 
-    public static final long MINIMUM_SCHEDULING_NANOS = 30000L;
+    public static final long MINIMUM_SCHEDULING_NANOS = 1L;
 
     /**
      * <p>

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java
new file mode 100644
index 0000000..6d548d2
--- /dev/null
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/IdentifierLookup.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides a mechanism for obtaining the identifiers of components, queues, 
etc.
+ */
+public interface IdentifierLookup {
+
+    /**
+     * @return the identifiers of components that may generate Provenance 
Events
+     */
+    List<String> getComponentIdentifiers();
+
+    /**
+     * @return a list of component types that may generate Provenance Events
+     */
+    List<String> getComponentTypes();
+
+    /**
+     *
+     * @return the identifiers of FlowFile Queues that are in the flow
+     */
+    List<String> getQueueIdentifiers();
+
+    default Map<String, Integer> invertQueueIdentifiers() {
+        return invertList(getQueueIdentifiers());
+    }
+
+    default Map<String, Integer> invertComponentTypes() {
+        return invertList(getComponentTypes());
+    }
+
+    default Map<String, Integer> invertComponentIdentifiers() {
+        return invertList(getComponentIdentifiers());
+    }
+
+    default Map<String, Integer> invertList(final List<String> values) {
+        final Map<String, Integer> inverted = new HashMap<>(values.size());
+        for (int i = 0; i < values.size(); i++) {
+            inverted.put(values.get(i), i);
+        }
+        return inverted;
+    }
+
+
+    public static final IdentifierLookup EMPTY = new IdentifierLookup() {
+        @Override
+        public List<String> getComponentIdentifiers() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<String> getComponentTypes() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<String> getQueueIdentifiers() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public Map<String, Integer> invertList(List<String> values) {
+            return Collections.emptyMap();
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
index 6a5954a..516a36d 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java
@@ -34,12 +34,13 @@ public interface ProvenanceRepository extends 
ProvenanceEventRepository {
      * Performs any initialization needed. This should be called only by the
      * framework.
      *
-     * @param eventReporter   to report to
-     * @param authorizer      the authorizer to use for authorizing individual 
events
+     * @param eventReporter to report to
+     * @param authorizer the authorizer to use for authorizing individual 
events
      * @param resourceFactory the resource factory to use for generating 
Provenance Resource objects for authorization purposes
+     * @param identifierLookup a mechanism for looking up identifiers in the 
flow
      * @throws java.io.IOException if unable to initialize
      */
-    void initialize(EventReporter eventReporter, Authorizer authorizer, 
ProvenanceAuthorizableFactory resourceFactory) throws IOException;
+    void initialize(EventReporter eventReporter, Authorizer authorizer, 
ProvenanceAuthorizableFactory resourceFactory, IdentifierLookup 
identifierLookup) throws IOException;
 
 
     ProvenanceEventRecord getEvent(long id, NiFiUser user) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java
index 4d0f991..ad480be 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance.lineage;
 
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -55,4 +56,6 @@ public interface ComputeLineageResult {
      * @return Indicates whether or not the lineage has finished running
      */
     boolean isFinished();
+
+    boolean awaitCompletion(long time, TimeUnit unit) throws 
InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java
index 3519c14..4db8e0f 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/Query.java
@@ -93,4 +93,8 @@ public class Query {
     public String toString() {
         return "Query[ " + searchTerms + " ]";
     }
+
+    public boolean isEmpty() {
+        return searchTerms.isEmpty() && maxFileSize == null && minFileSize == 
null && startDate == null && endDate == null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java
index 0079433..cc84ea1 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance.search;
 
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 
@@ -60,4 +61,6 @@ public interface QueryResult {
      * @return Indicates whether or not the query has finished running
      */
     boolean isFinished();
+
+    boolean awaitCompletion(long time, TimeUnit unit) throws 
InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
 
b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
index 9bc5f0e..53c3c2e 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceRepository.java
@@ -56,7 +56,7 @@ public class MockProvenanceRepository implements 
ProvenanceRepository {
     }
 
     @Override
-    public void initialize(EventReporter eventReporter, Authorizer authorizer, 
ProvenanceAuthorizableFactory resourceFactory) throws IOException {
+    public void initialize(EventReporter eventReporter, Authorizer authorizer, 
ProvenanceAuthorizableFactory resourceFactory, IdentifierLookup idLookup) 
throws IOException {
 
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 1ff1a2f..df87de5 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -63,7 +63,29 @@ public class MockFlowFile implements FlowFileRecord {
     }
 
     public MockFlowFile(final long id, final FlowFile toCopy) {
-        this(id);
+        this.creationTime = System.nanoTime();
+        this.id = id;
+        entryDate = System.currentTimeMillis();
+
+        final Map<String, String> attributesToCopy = toCopy.getAttributes();
+        String filename = attributesToCopy.get(CoreAttributes.FILENAME.key());
+        if (filename == null) {
+            filename = String.valueOf(System.nanoTime()) + ".mockFlowFile";
+        }
+        attributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        String path = attributesToCopy.get(CoreAttributes.PATH.key());
+        if (path == null) {
+            path = "target";
+        }
+        attributes.put(CoreAttributes.PATH.key(), path);
+
+        String uuid = attributesToCopy.get(CoreAttributes.UUID.key());
+        if (uuid == null) {
+            uuid = UUID.randomUUID().toString();
+        }
+        attributes.put(CoreAttributes.UUID.key(), uuid);
+
         attributes.putAll(toCopy.getAttributes());
         final byte[] dataToCopy = ((MockFlowFile) toCopy).data;
         this.data = new byte[dataToCopy.length];

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index ff42f47..1177dad 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -111,7 +111,7 @@ public abstract class AbstractPort implements Port {
         yieldPeriod = new AtomicReference<>("1 sec");
         yieldExpiration = new AtomicLong(0L);
         schedulingPeriod = new AtomicReference<>("0 millis");
-        schedulingNanos = new AtomicLong(30000);
+        schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
         scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index 18bcc3c..34ffbac 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -100,7 +100,7 @@ public class StandardFunnel implements Funnel {
         yieldPeriod = new AtomicReference<>("250 millis");
         yieldExpiration = new AtomicLong(0L);
         schedulingPeriod = new AtomicReference<>("0 millis");
-        schedulingNanos = new AtomicLong(30000);
+        schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
         name = new AtomicReference<>("Funnel");
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7fd85b9..191fc65 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -152,6 +152,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessorInitializationContext;
 import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.provenance.IdentifierLookup;
 import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
@@ -233,10 +234,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
-public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider, QueueProvider, Authorizable, 
ProvenanceAuthorizableFactory, NodeTypeProvider {
+public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider,
+    QueueProvider, Authorizable, ProvenanceAuthorizableFactory, 
NodeTypeProvider, IdentifierLookup {
 
     // default repository implementations
     public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -454,7 +457,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         try {
             this.provenanceRepository = 
createProvenanceRepository(nifiProperties);
-            
this.provenanceRepository.initialize(createEventReporter(bulletinRepository), 
authorizer, this);
+            
this.provenanceRepository.initialize(createEventReporter(bulletinRepository), 
authorizer, this, this);
         } catch (final Exception e) {
             throw new RuntimeException("Unable to create Provenance 
Repository", e);
         }
@@ -3886,6 +3889,39 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return replayEvent;
     }
 
+    @Override
+    public List<String> getComponentIdentifiers() {
+        final List<String> componentIds = new ArrayList<>();
+        getGroup(getRootGroupId()).findAllProcessors().stream()
+            .forEach(proc -> componentIds.add(proc.getIdentifier()));
+        getGroup(getRootGroupId()).getInputPorts().stream()
+            .forEach(port -> componentIds.add(port.getIdentifier()));
+        getGroup(getRootGroupId()).getOutputPorts().stream()
+            .forEach(port -> componentIds.add(port.getIdentifier()));
+
+        return componentIds;
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public List<String> getComponentTypes() {
+        final Set<Class> procClasses = 
ExtensionManager.getExtensions(Processor.class);
+        final List<String> componentTypes = new ArrayList<>(procClasses.size() 
+ 2);
+        componentTypes.add(ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE);
+        componentTypes.add(ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE);
+        procClasses.stream()
+            .map(procClass -> procClass.getSimpleName())
+            .forEach(componentType -> componentTypes.add(componentType));
+        return componentTypes;
+    }
+
+    @Override
+    public List<String> getQueueIdentifiers() {
+        return getAllQueues().stream()
+            .map(q -> q.getIdentifier())
+            .collect(Collectors.toList());
+    }
+
     public boolean isConnected() {
         rwLock.readLock().lock();
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 7108cfe..67df539 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -911,6 +911,7 @@ public class FileSystemRepository implements 
ContentRepository {
                 }
 
                 bytesWritten += len;
+
                 scc.setLength(bytesWritten + initialLength);
             }
 

Reply via email to