http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
new file mode 100644
index 0000000..bb8d52f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.provenance.schema.EventFieldNames;
+import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema;
+import org.apache.nifi.provenance.schema.LookupTableEventRecord;
+import org.apache.nifi.provenance.schema.LookupTableEventSchema;
+import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.repository.schema.FieldMapRecord;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.util.timebuffer.LongEntityAccess;
+import org.apache.nifi.util.timebuffer.TimedBuffer;
+import org.apache.nifi.util.timebuffer.TimestampedLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
+    private static final Logger logger = 
LoggerFactory.getLogger(EventIdFirstSchemaRecordWriter.class);
+
+    private static final RecordSchema eventSchema = 
LookupTableEventSchema.EVENT_SCHEMA;
+    private static final RecordSchema contentClaimSchema = new 
RecordSchema(eventSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields());
+    private static final RecordSchema previousContentClaimSchema = new 
RecordSchema(eventSchema.getField(EventFieldNames.PREVIOUS_CONTENT_CLAIM).getSubFields());
+    private static final RecordSchema headerSchema = 
EventIdFirstHeaderSchema.SCHEMA;
+
+    public static final int SERIALIZATION_VERSION = 1;
+    public static final String SERIALIZATION_NAME = 
"EventIdFirstSchemaRecordWriter";
+    private final IdentifierLookup idLookup;
+
+    private final SchemaRecordWriter schemaRecordWriter = new 
SchemaRecordWriter();
+    private final AtomicInteger recordCount = new AtomicInteger(0);
+
+    private final Map<String, Integer> componentIdMap;
+    private final Map<String, Integer> componentTypeMap;
+    private final Map<String, Integer> queueIdMap;
+    private static final Map<String, Integer> eventTypeMap;
+    private static final List<String> eventTypeNames;
+
+    private static final TimedBuffer<TimestampedLong> serializeTimes = new 
TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
+    private static final TimedBuffer<TimestampedLong> lockTimes = new 
TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
+    private static final TimedBuffer<TimestampedLong> writeTimes = new 
TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
+    private static final TimedBuffer<TimestampedLong> bytesWritten = new 
TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
+    private static final AtomicLong totalRecordCount = new AtomicLong(0L);
+
+    private long firstEventId;
+    private long systemTimeOffset;
+
+    static {
+        eventTypeMap = new HashMap<>();
+        eventTypeNames = new ArrayList<>();
+
+        int count = 0;
+        for (final ProvenanceEventType eventType : 
ProvenanceEventType.values()) {
+            eventTypeMap.put(eventType.name(), count++);
+            eventTypeNames.add(eventType.name());
+        }
+    }
+
+    public EventIdFirstSchemaRecordWriter(final File file, final AtomicLong 
idGenerator, final TocWriter writer, final boolean compressed,
+        final int uncompressedBlockSize, final IdentifierLookup idLookup) 
throws IOException {
+        super(file, idGenerator, writer, compressed, uncompressedBlockSize);
+
+        this.idLookup = idLookup;
+        componentIdMap = idLookup.invertComponentIdentifiers();
+        componentTypeMap = idLookup.invertComponentTypes();
+        queueIdMap = idLookup.invertQueueIdentifiers();
+    }
+
+    public EventIdFirstSchemaRecordWriter(final OutputStream out, final String 
storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter, final 
boolean compressed,
+        final int uncompressedBlockSize, final IdentifierLookup idLookup) 
throws IOException {
+        super(out, storageLocation, idGenerator, tocWriter, compressed, 
uncompressedBlockSize);
+
+        this.idLookup = idLookup;
+        componentIdMap = idLookup.invertComponentIdentifiers();
+        componentTypeMap = idLookup.invertComponentTypes();
+        queueIdMap = idLookup.invertQueueIdentifiers();
+    }
+
+    @Override
+    public StorageSummary writeRecord(final ProvenanceEventRecord record) 
throws IOException {
+        if (isDirty()) {
+            throw new IOException("Cannot update Provenance Repository because 
this Record Writer has already failed to write to the Repository");
+        }
+
+        final long serializeStart = System.nanoTime();
+        final byte[] serialized;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
+            final DataOutputStream dos = new DataOutputStream(baos)) {
+            writeRecord(record, 0L, dos);
+            serialized = baos.toByteArray();
+        }
+
+        final long lockStart = System.nanoTime();
+        final long writeStart;
+        final long startBytes;
+        final long endBytes;
+        final long recordIdentifier;
+        synchronized (this) {
+            writeStart = System.nanoTime();
+            try {
+                recordIdentifier = record.getEventId() == -1L ? 
getIdGenerator().getAndIncrement() : record.getEventId();
+                startBytes = getBytesWritten();
+
+                ensureStreamState(recordIdentifier, startBytes);
+
+                final DataOutputStream out = getBufferedOutputStream();
+                final int recordIdOffset = (int) (recordIdentifier - 
firstEventId);
+                out.writeInt(recordIdOffset);
+                out.writeInt(serialized.length);
+                out.write(serialized);
+
+                recordCount.incrementAndGet();
+                endBytes = getBytesWritten();
+            } catch (final IOException ioe) {
+                markDirty();
+                throw ioe;
+            }
+        }
+
+        if (logger.isDebugEnabled()) {
+            // Collect stats and periodically dump them if log level is set to 
at least info.
+            final long writeNanos = System.nanoTime() - writeStart;
+            writeTimes.add(new TimestampedLong(writeNanos));
+
+            final long serializeNanos = lockStart - serializeStart;
+            serializeTimes.add(new TimestampedLong(serializeNanos));
+
+            final long lockNanos = writeStart - lockStart;
+            lockTimes.add(new TimestampedLong(lockNanos));
+            bytesWritten.add(new TimestampedLong(endBytes - startBytes));
+
+            final long recordCount = totalRecordCount.incrementAndGet();
+            if (recordCount % 1_000_000 == 0) {
+                final long sixtySecondsAgo = System.currentTimeMillis() - 
60000L;
+                final Long writeNanosLast60 = 
writeTimes.getAggregateValue(sixtySecondsAgo).getValue();
+                final Long lockNanosLast60 = 
lockTimes.getAggregateValue(sixtySecondsAgo).getValue();
+                final Long serializeNanosLast60 = 
serializeTimes.getAggregateValue(sixtySecondsAgo).getValue();
+                final Long bytesWrittenLast60 = 
bytesWritten.getAggregateValue(sixtySecondsAgo).getValue();
+                logger.debug("In the last 60 seconds, have spent {} millis 
writing to file ({} MB), {} millis waiting on synchronize block, {} millis 
serializing events",
+                    TimeUnit.NANOSECONDS.toMillis(writeNanosLast60),
+                    bytesWrittenLast60 / 1024 / 1024,
+                    TimeUnit.NANOSECONDS.toMillis(lockNanosLast60),
+                    TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60));
+            }
+        }
+
+        final long serializedLength = endBytes - startBytes;
+        final TocWriter tocWriter = getTocWriter();
+        final Integer blockIndex = tocWriter == null ? null : 
tocWriter.getCurrentBlockIndex();
+        final File file = getFile();
+        final String storageLocation = file.getParentFile().getName() + "/" + 
file.getName();
+        return new StorageSummary(recordIdentifier, storageLocation, 
blockIndex, serializedLength, endBytes);
+    }
+
+    @Override
+    public int getRecordsWritten() {
+        return recordCount.get();
+    }
+
+    protected Record createRecord(final ProvenanceEventRecord event, final 
long eventId) {
+        return new LookupTableEventRecord(event, eventId, eventSchema, 
contentClaimSchema, previousContentClaimSchema, firstEventId, systemTimeOffset,
+            componentIdMap, componentTypeMap, queueIdMap, eventTypeMap);
+    }
+
+    @Override
+    protected void writeRecord(final ProvenanceEventRecord event, final long 
eventId, final DataOutputStream out) throws IOException {
+        final Record eventRecord = createRecord(event, eventId);
+        schemaRecordWriter.writeRecord(eventRecord, out);
+    }
+
+    @Override
+    protected synchronized void writeHeader(final long firstEventId, final 
DataOutputStream out) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        eventSchema.writeTo(baos);
+
+        out.writeInt(baos.size());
+        baos.writeTo(out);
+
+        baos.reset();
+        headerSchema.writeTo(baos);
+        out.writeInt(baos.size());
+        baos.writeTo(out);
+
+        this.firstEventId = firstEventId;
+        this.systemTimeOffset = System.currentTimeMillis();
+
+        final Map<String, Object> headerValues = new HashMap<>();
+        headerValues.put(EventIdFirstHeaderSchema.FieldNames.FIRST_EVENT_ID, 
firstEventId);
+        headerValues.put(EventIdFirstHeaderSchema.FieldNames.TIMESTAMP_OFFSET, 
systemTimeOffset);
+        headerValues.put(EventIdFirstHeaderSchema.FieldNames.COMPONENT_IDS, 
idLookup.getComponentIdentifiers());
+        headerValues.put(EventIdFirstHeaderSchema.FieldNames.COMPONENT_TYPES, 
idLookup.getComponentTypes());
+        headerValues.put(EventIdFirstHeaderSchema.FieldNames.QUEUE_IDS, 
idLookup.getQueueIdentifiers());
+        headerValues.put(EventIdFirstHeaderSchema.FieldNames.EVENT_TYPES, 
eventTypeNames);
+        final FieldMapRecord headerInfo = new FieldMapRecord(headerSchema, 
headerValues);
+
+        schemaRecordWriter.writeRecord(headerInfo, out);
+    }
+
+    @Override
+    protected int getSerializationVersion() {
+        return SERIALIZATION_VERSION;
+    }
+
+    @Override
+    protected String getSerializationName() {
+        return SERIALIZATION_NAME;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index af7bff5..a28d150 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -63,7 +63,7 @@ public class IndexConfiguration {
     private Map<File, List<File>> recoverIndexDirectories() {
         final Map<File, List<File>> indexDirectoryMap = new HashMap<>();
 
-        for (final File storageDirectory : repoConfig.getStorageDirectories()) 
{
+        for (final File storageDirectory : 
repoConfig.getStorageDirectories().values()) {
             final List<File> indexDirectories = new ArrayList<>();
             final File[] matching = storageDirectory.listFiles(new 
FileFilter() {
                 @Override
@@ -85,6 +85,10 @@ public class IndexConfiguration {
     }
 
     private Long getFirstEntryTime(final File provenanceLogFile) {
+        if (provenanceLogFile == null) {
+            return null;
+        }
+
         try (final RecordReader reader = 
RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) {
             final StandardProvenanceEventRecord firstRecord = 
reader.nextRecord();
             if (firstRecord == null) {
@@ -121,10 +125,14 @@ public class IndexConfiguration {
         }
     }
 
+
     public File getWritableIndexDirectory(final File provenanceLogFile, final 
long newIndexTimestamp) {
+        return 
getWritableIndexDirectoryForStorageDirectory(provenanceLogFile.getParentFile(), 
provenanceLogFile, newIndexTimestamp);
+    }
+
+    public File getWritableIndexDirectoryForStorageDirectory(final File 
storageDirectory, final File provenanceLogFile, final long newIndexTimestamp) {
         lock.lock();
         try {
-            final File storageDirectory = provenanceLogFile.getParentFile();
             List<File> indexDirectories = 
this.indexDirectoryMap.get(storageDirectory);
             if (indexDirectories == null) {
                 final File newDir = addNewIndex(storageDirectory, 
provenanceLogFile, newIndexTimestamp);

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 3037e66..ed183f9 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -16,10 +16,54 @@
  */
 package org.apache.nifi.provenance;
 
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexNotFoundException;
-import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
@@ -32,14 +76,17 @@ import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
 import org.apache.nifi.provenance.expiration.ExpirationAction;
 import org.apache.nifi.provenance.expiration.FileRemovalAction;
+import org.apache.nifi.provenance.index.EventIndexWriter;
 import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
 import org.apache.nifi.provenance.lineage.FlowFileLineage;
 import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageComputationType;
 import org.apache.nifi.provenance.lucene.DeleteIndexAction;
+import org.apache.nifi.provenance.lucene.DocsReader;
+import org.apache.nifi.provenance.lucene.DocumentToEventConverter;
 import org.apache.nifi.provenance.lucene.FieldNames;
 import org.apache.nifi.provenance.lucene.IndexManager;
 import org.apache.nifi.provenance.lucene.IndexSearch;
@@ -56,8 +103,10 @@ import 
org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.serialization.RecordWriters;
+import org.apache.nifi.provenance.serialization.StorageSummary;
 import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.util.NamedThreadFactory;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
@@ -74,51 +123,6 @@ import org.apache.nifi.web.ResourceNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 public class PersistentProvenanceRepository implements ProvenanceRepository {
 
     public static final String EVENT_CATEGORY = "Provenance Repository";
@@ -209,7 +213,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
     }
 
     public PersistentProvenanceRepository(final NiFiProperties nifiProperties) 
throws IOException {
-        this(createRepositoryConfiguration(nifiProperties), 10000);
+        this(RepositoryConfiguration.create(nifiProperties), 10000);
     }
 
     public PersistentProvenanceRepository(final RepositoryConfiguration 
configuration, final int rolloverCheckMillis) throws IOException {
@@ -220,7 +224,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         this.configuration = configuration;
         this.maxAttributeChars = configuration.getMaxAttributeChars();
 
-        for (final File file : configuration.getStorageDirectories()) {
+        for (final File file : configuration.getStorageDirectories().values()) 
{
             final Path storageDirectory = file.toPath();
             final Path journalDirectory = storageDirectory.resolve("journals");
 
@@ -234,7 +238,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         this.maxPartitionMillis = 
configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
         this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
         this.indexConfig = new IndexConfiguration(configuration);
-        this.indexManager = new SimpleIndexManager();
+        this.indexManager = new SimpleIndexManager(configuration);
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
 
@@ -254,7 +258,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
     }
 
     @Override
-    public void initialize(final EventReporter eventReporter, final Authorizer 
authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws 
IOException {
+    public void initialize(final EventReporter eventReporter, final Authorizer 
authorizer, final ProvenanceAuthorizableFactory resourceFactory,
+        final IdentifierLookup idLookup) throws IOException {
         writeLock.lock();
         try {
             if (initialized.getAndSet(true)) {
@@ -325,82 +330,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         }
     }
 
-    private static RepositoryConfiguration createRepositoryConfiguration(final 
NiFiProperties nifiProperties) throws IOException {
-        final Map<String, Path> storageDirectories = 
nifiProperties.getProvenanceRepositoryPaths();
-        if (storageDirectories.isEmpty()) {
-            storageDirectories.put("provenance_repository", 
Paths.get("provenance_repository"));
-        }
-        final String storageTime = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 
hours");
-        final String storageSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
-        final String rolloverTime = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
-        final String rolloverSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
-        final String shardSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 
MB");
-        final int queryThreads = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE,
 2);
-        final int indexThreads = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE,
 1);
-        final int journalCount = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
-
-        final long storageMillis = FormatUtils.getTimeDuration(storageTime, 
TimeUnit.MILLISECONDS);
-        final long maxStorageBytes = DataUnit.parseDataSize(storageSize, 
DataUnit.B).longValue();
-        final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, 
TimeUnit.MILLISECONDS);
-        final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, 
DataUnit.B).longValue();
-
-        final boolean compressOnRollover = 
Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
-        final String indexedFieldString = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
-        final String indexedAttrString = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
-
-        final Boolean alwaysSync = 
Boolean.parseBoolean(nifiProperties.getProperty("nifi.provenance.repository.always.sync",
 "false"));
-
-        final int defaultMaxAttrChars = 65536;
-        final String maxAttrLength = 
nifiProperties.getProperty("nifi.provenance.repository.max.attribute.length", 
String.valueOf(defaultMaxAttrChars));
-        int maxAttrChars;
-        try {
-            maxAttrChars = Integer.parseInt(maxAttrLength);
-            // must be at least 36 characters because that's the length of the 
uuid attribute,
-            // which must be kept intact
-            if (maxAttrChars < 36) {
-                maxAttrChars = 36;
-                logger.warn("Found max attribute length property set to " + 
maxAttrLength + " but minimum length is 36; using 36 instead");
-            }
-        } catch (final Exception e) {
-            maxAttrChars = defaultMaxAttrChars;
-        }
-
-        final List<SearchableField> searchableFields = 
SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
-        final List<SearchableField> searchableAttributes = 
SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
-
-        // We always want to index the Event Time.
-        if (!searchableFields.contains(SearchableFields.EventTime)) {
-            searchableFields.add(SearchableFields.EventTime);
-        }
-
-        final RepositoryConfiguration config = new RepositoryConfiguration();
-        for (final Path path : storageDirectories.values()) {
-            config.addStorageDirectory(path.toFile());
-        }
-        config.setCompressOnRollover(compressOnRollover);
-        config.setSearchableFields(searchableFields);
-        config.setSearchableAttributes(searchableAttributes);
-        config.setMaxEventFileCapacity(rolloverBytes);
-        config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
-        config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
-        config.setMaxStorageCapacity(maxStorageBytes);
-        config.setQueryThreadPoolSize(queryThreads);
-        config.setIndexThreadPoolSize(indexThreads);
-        config.setJournalCount(journalCount);
-        config.setMaxAttributeChars(maxAttrChars);
-
-        if (shardSize != null) {
-            config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, 
DataUnit.B).longValue());
-        }
-
-        config.setAlwaysSync(alwaysSync);
-
-        return config;
-    }
 
     // protected in order to override for unit tests
     protected RecordWriter[] createWriters(final RepositoryConfiguration 
config, final long initialRecordId) throws IOException {
-        final List<File> storageDirectories = config.getStorageDirectories();
+        final List<File> storageDirectories = new 
ArrayList<>(config.getStorageDirectories().values());
 
         final RecordWriter[] writers = new 
RecordWriter[config.getJournalCount()];
         for (int i = 0; i < config.getJournalCount(); i++) {
@@ -408,7 +341,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
             final File journalDirectory = new File(storageDirectory, 
"journals");
             final File journalFile = new File(journalDirectory, 
String.valueOf(initialRecordId) + ".journal." + i);
 
-            writers[i] = RecordWriters.newSchemaRecordWriter(journalFile, 
false, false);
+            writers[i] = RecordWriters.newSchemaRecordWriter(journalFile, 
idGenerator, false, false);
             writers[i].writeHeader(initialRecordId);
         }
 
@@ -460,7 +393,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         return Result.Approved.equals(result.getResult());
     }
 
-    protected void authorize(final ProvenanceEventRecord event, final NiFiUser 
user) {
+    public void authorize(final ProvenanceEventRecord event, final NiFiUser 
user) {
         if (authorizer == null) {
             return;
         }
@@ -474,11 +407,11 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         eventAuthorizable.authorize(authorizer, RequestAction.READ, user, 
event.getAttributes());
     }
 
-    private List<ProvenanceEventRecord> filterUnauthorizedEvents(final 
List<ProvenanceEventRecord> events, final NiFiUser user) {
+    public List<ProvenanceEventRecord> filterUnauthorizedEvents(final 
List<ProvenanceEventRecord> events, final NiFiUser user) {
         return events.stream().filter(event -> isAuthorized(event, 
user)).collect(Collectors.<ProvenanceEventRecord>toList());
     }
 
-    private Set<ProvenanceEventRecord> 
replaceUnauthorizedWithPlaceholders(final Set<ProvenanceEventRecord> events, 
final NiFiUser user) {
+    public Set<ProvenanceEventRecord> 
replaceUnauthorizedWithPlaceholders(final Set<ProvenanceEventRecord> events, 
final NiFiUser user) {
         return events.stream().map(event -> isAuthorized(event, user) ? event 
: new PlaceholderProvenanceEvent(event)).collect(Collectors.toSet());
     }
 
@@ -594,7 +527,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         long minIndexedId = Long.MAX_VALUE;
 
         final List<File> filesToRecover = new ArrayList<>();
-        for (final File file : configuration.getStorageDirectories()) {
+        for (final File file : configuration.getStorageDirectories().values()) 
{
             final File[] matchingFiles = file.listFiles(new FileFilter() {
                 @Override
                 public boolean accept(final File pathname) {
@@ -780,10 +713,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                 try {
                     long recordsWritten = 0L;
                     for (final ProvenanceEventRecord nextRecord : records) {
-                        final long eventId = idGenerator.getAndIncrement();
-                        bytesWritten += writer.writeRecord(nextRecord, 
eventId);
+                        final StorageSummary persistedEvent = 
writer.writeRecord(nextRecord);
+                        bytesWritten += persistedEvent.getSerializedLength();
                         recordsWritten++;
-                        logger.trace("Wrote record with ID {} to {}", eventId, 
writer);
+                        logger.trace("Wrote record with ID {} to {}", 
persistedEvent.getEventId(), writer);
                     }
 
                     writer.flush();
@@ -1175,7 +1108,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
      */
     private List<File> getAllIndexDirectories() {
         final List<File> allIndexDirs = new ArrayList<>();
-        for (final File storageDir : configuration.getStorageDirectories()) {
+        for (final File storageDir : 
configuration.getStorageDirectories().values()) {
             final File[] indexDirs = storageDir.listFiles(new FilenameFilter() 
{
                 @Override
                 public boolean accept(final File dir, final String name) {
@@ -1237,7 +1170,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
     protected int getJournalCount() {
         // determine how many 'journals' we have in the journals directories
         int journalFileCount = 0;
-        for (final File storageDir : configuration.getStorageDirectories()) {
+        for (final File storageDir : 
configuration.getStorageDirectories().values()) {
             final File journalsDir = new File(storageDir, "journals");
             final File[] journalFiles = journalsDir.listFiles();
             if (journalFiles != null) {
@@ -1313,7 +1246,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
 
             // Choose a storage directory to store the merged file in.
             final long storageDirIdx = storageDirectoryIndex.getAndIncrement();
-            final List<File> storageDirs = 
configuration.getStorageDirectories();
+            final List<File> storageDirs = new 
ArrayList<>(configuration.getStorageDirectories().values());
             final File storageDir = storageDirs.get((int) (storageDirIdx % 
storageDirs.size()));
 
             Future<?> future = null;
@@ -1479,8 +1412,8 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         final Map<String, List<File>> journalMap = new HashMap<>();
 
         // Map journals' basenames to the files with that basename.
-        final List<File> storageDirs = configuration.getStorageDirectories();
-        for (final File storageDir : configuration.getStorageDirectories()) {
+        final List<File> storageDirs = new 
ArrayList<>(configuration.getStorageDirectories().values());
+        for (final File storageDir : storageDirs) {
             final File journalDir = new File(storageDir, "journals");
             if (!journalDir.exists()) {
                 continue;
@@ -1729,7 +1662,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
 
             // loop over each entry in the map, persisting the records to the 
merged file in order, and populating the map
             // with the next entry from the journal file from which the 
previous record was written.
-            try (final RecordWriter writer = 
RecordWriters.newSchemaRecordWriter(writerFile, 
configuration.isCompressOnRollover(), true)) {
+            try (final RecordWriter writer = 
RecordWriters.newSchemaRecordWriter(writerFile, idGenerator, 
configuration.isCompressOnRollover(), true)) {
                 writer.writeHeader(minEventId);
 
                 final IndexingAction indexingAction = createIndexingAction();
@@ -1741,7 +1674,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                 final AtomicBoolean finishedAdding = new AtomicBoolean(false);
                 final List<Future<?>> futures = new ArrayList<>();
 
-                final IndexWriter indexWriter = 
getIndexManager().borrowIndexWriter(indexingDirectory);
+                final EventIndexWriter indexWriter = 
getIndexManager().borrowIndexWriter(indexingDirectory);
                 try {
                     final ExecutorService exec = 
Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new 
ThreadFactory() {
                         @Override
@@ -1772,7 +1705,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                                                 continue;
                                             }
 
-                                            
indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
+                                            
indexingAction.index(tuple.getKey(), indexWriter.getIndexWriter(), 
tuple.getValue());
                                         } catch (final Throwable t) {
                                             logger.error("Failed to index 
Provenance Event for " + writerFile + " to " + indexingDirectory, t);
                                             if 
(indexingFailureCount.incrementAndGet() >= MAX_INDEXING_FAILURE_COUNT) {
@@ -1795,7 +1728,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                             final StandardProvenanceEventRecord record = 
entry.getKey();
                             final RecordReader reader = entry.getValue();
 
-                            writer.writeRecord(record, record.getEventId());
+                            writer.writeRecord(record);
                             final int blockIndex = 
writer.getTocWriter().getCurrentBlockIndex();
 
                             boolean accepted = false;
@@ -1879,7 +1812,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                         }
                     }
                 } finally {
-                    getIndexManager().returnIndexWriter(indexingDirectory, 
indexWriter);
+                    getIndexManager().returnIndexWriter(indexWriter);
                 }
 
                 indexConfig.setMaxIdIndexed(maxId);
@@ -1945,7 +1878,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
      * events indexed, etc.
      */
     protected IndexingAction createIndexingAction() {
-        return new IndexingAction(this);
+        return new IndexingAction(configuration.getSearchableFields(), 
configuration.getSearchableAttributes());
     }
 
     private StandardProvenanceEventRecord truncateAttributes(final 
StandardProvenanceEventRecord original) {
@@ -2322,7 +2255,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
             if (event == null) {
                 final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String>emptyList(), 1, userId);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
-                
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
+                submission.getResult().update(Collections.emptyList(), 0L);
                 return submission;
             }
 
@@ -2359,9 +2292,9 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         try {
             final ProvenanceEventRecord event = getEvent(eventId);
             if (event == null) {
-                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String>emptyList(), 1, userId);
+                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.emptyList(), 1, userId);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
-                
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
+                submission.getResult().update(Collections.emptyList(), 0L);
                 return submission;
             }
 
@@ -2642,11 +2575,21 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
             }
 
             try {
-                final Set<ProvenanceEventRecord> matchingRecords = 
LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this,
-                        getIndexManager(), indexDir, null, flowFileUuids, 
maxAttributeChars);
+                final DocumentToEventConverter converter = new 
DocumentToEventConverter() {
+                    @Override
+                    public Set<ProvenanceEventRecord> convert(TopDocs topDocs, 
IndexReader indexReader) throws IOException {
+                        // Always authorized. We do this because we need to 
pull back the event, regardless of whether or not
+                        // the user is truly authorized, because instead of 
ignoring unauthorized events, we want to replace them.
+                        final EventAuthorizer authorizer = 
EventAuthorizer.GRANT_ALL;
+                        final DocsReader docsReader = new DocsReader();
+                        return docsReader.read(topDocs, authorizer, 
indexReader, getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE, 
maxAttributeChars);
+                    }
+                };
+
+                final Set<ProvenanceEventRecord> matchingRecords = 
LineageQuery.computeLineageForFlowFiles(getIndexManager(), indexDir, null, 
flowFileUuids, converter);
 
                 final StandardLineageResult result = submission.getResult();
-                
result.update(replaceUnauthorizedWithPlaceholders(matchingRecords, user));
+                
result.update(replaceUnauthorizedWithPlaceholders(matchingRecords, user), 
matchingRecords.size());
 
                 logger.info("Successfully created Lineage for FlowFiles with 
UUIDs {} in {} milliseconds; Lineage contains {} nodes and {} edges",
                         flowFileUuids, 
result.getComputationTime(TimeUnit.MILLISECONDS), result.getNodes().size(), 
result.getEdges().size());
@@ -2666,7 +2609,6 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
     }
 
     private class RemoveExpiredQueryResults implements Runnable {
-
         @Override
         public void run() {
             try {
@@ -2697,22 +2639,4 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
             }
         }
     }
-
-    private static class NamedThreadFactory implements ThreadFactory {
-
-        private final AtomicInteger counter = new AtomicInteger(0);
-        private final ThreadFactory defaultThreadFactory = 
Executors.defaultThreadFactory();
-        private final String namePrefix;
-
-        public NamedThreadFactory(final String namePrefix) {
-            this.namePrefix = namePrefix;
-        }
-
-        @Override
-        public Thread newThread(final Runnable r) {
-            final Thread thread = defaultThreadFactory.newThread(r);
-            thread.setName(namePrefix + "-" + counter.incrementAndGet());
-            return thread;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index e63133a..7a2f57e 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -17,20 +17,35 @@
 package org.apache.nifi.provenance;
 
 import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RepositoryConfiguration {
+    private static final Logger logger = 
LoggerFactory.getLogger(RepositoryConfiguration.class);
 
-    private final List<File> storageDirectories = new ArrayList<>();
+    public static final String CONCURRENT_MERGE_THREADS = 
"nifi.provenance.repository.concurrent.merge.threads";
+    public static final String WARM_CACHE_FREQUENCY = 
"nifi.provenance.repository.warm.cache.frequency";
+
+    private final Map<String, File> storageDirectories = new LinkedHashMap<>();
     private long recordLifeMillis = TimeUnit.MILLISECONDS.convert(24, 
TimeUnit.HOURS);
     private long storageCapacity = 1024L * 1024L * 1024L;   // 1 GB
     private long eventFileMillis = TimeUnit.MILLISECONDS.convert(5, 
TimeUnit.MINUTES);
     private long eventFileBytes = 1024L * 1024L * 5L;   // 5 MB
+    private int maxFileEvents = Integer.MAX_VALUE;
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int journalCount = 16;
     private int compressionBlockBytes = 1024 * 1024;
@@ -43,6 +58,8 @@ public class RepositoryConfiguration {
     private int queryThreadPoolSize = 2;
     private int indexThreadPoolSize = 1;
     private boolean allowRollover = true;
+    private int concurrentMergeThreads = 4;
+    private Integer warmCacheFrequencyMinutes = null;
 
     public void setAllowRollover(final boolean allow) {
         this.allowRollover = allow;
@@ -52,7 +69,6 @@ public class RepositoryConfiguration {
         return allowRollover;
     }
 
-
     public int getCompressionBlockBytes() {
         return compressionBlockBytes;
     }
@@ -66,8 +82,8 @@ public class RepositoryConfiguration {
      *
      * @return the directories where provenance files will be stored
      */
-    public List<File> getStorageDirectories() {
-        return Collections.unmodifiableList(storageDirectories);
+    public Map<String, File> getStorageDirectories() {
+        return Collections.unmodifiableMap(storageDirectories);
     }
 
     /**
@@ -75,8 +91,12 @@ public class RepositoryConfiguration {
      *
      * @param storageDirectory the directory to store provenance files
      */
-    public void addStorageDirectory(final File storageDirectory) {
-        this.storageDirectories.add(storageDirectory);
+    public void addStorageDirectory(final String partitionName, final File 
storageDirectory) {
+        this.storageDirectories.put(partitionName, storageDirectory);
+    }
+
+    public void addStorageDirectories(final Map<String, File> 
storageDirectories) {
+        this.storageDirectories.putAll(storageDirectories);
     }
 
     /**
@@ -148,6 +168,20 @@ public class RepositoryConfiguration {
     }
 
     /**
+     * @return the maximum number of events that should be written to a single 
event file before the file is rolled over
+     */
+    public int getMaxEventFileCount() {
+        return maxFileEvents;
+    }
+
+    /**
+     * @param maxCount the maximum number of events that should be written to 
a single event file before the file is rolled over
+     */
+    public void setMaxEventFileCount(final int maxCount) {
+        this.maxFileEvents = maxCount;
+    }
+
+    /**
      * @return the fields that should be indexed
      */
     public List<SearchableField> getSearchableFields() {
@@ -218,6 +252,14 @@ public class RepositoryConfiguration {
         this.indexThreadPoolSize = indexThreadPoolSize;
     }
 
+    public void setConcurrentMergeThreads(final int mergeThreads) {
+        this.concurrentMergeThreads = mergeThreads;
+    }
+
+    public int getConcurrentMergeThreads() {
+        return concurrentMergeThreads;
+    }
+
     /**
      * <p>
      * Specifies the desired size of each Provenance Event index shard, in
@@ -310,4 +352,90 @@ public class RepositoryConfiguration {
         this.maxAttributeChars = maxAttributeChars;
     }
 
+    public void setWarmCacheFrequencyMinutes(Integer frequencyMinutes) {
+        this.warmCacheFrequencyMinutes = frequencyMinutes;
+    }
+
+    public Optional<Integer> getWarmCacheFrequencyMinutes() {
+        return Optional.ofNullable(warmCacheFrequencyMinutes);
+    }
+
+    public static RepositoryConfiguration create(final NiFiProperties 
nifiProperties) {
+        final Map<String, Path> storageDirectories = 
nifiProperties.getProvenanceRepositoryPaths();
+        if (storageDirectories.isEmpty()) {
+            storageDirectories.put("provenance_repository", 
Paths.get("provenance_repository"));
+        }
+        final String storageTime = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 
hours");
+        final String storageSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
+        final String rolloverTime = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
+        final String rolloverSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
+        final String shardSize = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 
MB");
+        final int queryThreads = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE,
 2);
+        final int indexThreads = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE,
 2);
+        final int journalCount = 
nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
+        final int concurrentMergeThreads = 
nifiProperties.getIntegerProperty(CONCURRENT_MERGE_THREADS, 2);
+        final String warmCacheFrequency = 
nifiProperties.getProperty(WARM_CACHE_FREQUENCY);
+
+        final long storageMillis = FormatUtils.getTimeDuration(storageTime, 
TimeUnit.MILLISECONDS);
+        final long maxStorageBytes = DataUnit.parseDataSize(storageSize, 
DataUnit.B).longValue();
+        final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, 
TimeUnit.MILLISECONDS);
+        final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, 
DataUnit.B).longValue();
+
+        final boolean compressOnRollover = 
Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
+        final String indexedFieldString = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
+        final String indexedAttrString = 
nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
+
+        final Boolean alwaysSync = 
Boolean.parseBoolean(nifiProperties.getProperty("nifi.provenance.repository.always.sync",
 "false"));
+
+        final int defaultMaxAttrChars = 65536;
+        final String maxAttrLength = 
nifiProperties.getProperty("nifi.provenance.repository.max.attribute.length", 
String.valueOf(defaultMaxAttrChars));
+        int maxAttrChars;
+        try {
+            maxAttrChars = Integer.parseInt(maxAttrLength);
+            // must be at least 36 characters because that's the length of the 
uuid attribute,
+            // which must be kept intact
+            if (maxAttrChars < 36) {
+                maxAttrChars = 36;
+                logger.warn("Found max attribute length property set to " + 
maxAttrLength + " but minimum length is 36; using 36 instead");
+            }
+        } catch (final Exception e) {
+            maxAttrChars = defaultMaxAttrChars;
+        }
+
+        final List<SearchableField> searchableFields = 
SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
+        final List<SearchableField> searchableAttributes = 
SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
+
+        // We always want to index the Event Time.
+        if (!searchableFields.contains(SearchableFields.EventTime)) {
+            searchableFields.add(SearchableFields.EventTime);
+        }
+
+        final RepositoryConfiguration config = new RepositoryConfiguration();
+        for (final Map.Entry<String, Path> entry : 
storageDirectories.entrySet()) {
+            config.addStorageDirectory(entry.getKey(), 
entry.getValue().toFile());
+        }
+        config.setCompressOnRollover(compressOnRollover);
+        config.setSearchableFields(searchableFields);
+        config.setSearchableAttributes(searchableAttributes);
+        config.setMaxEventFileCapacity(rolloverBytes);
+        config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
+        config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
+        config.setMaxStorageCapacity(maxStorageBytes);
+        config.setQueryThreadPoolSize(queryThreads);
+        config.setIndexThreadPoolSize(indexThreads);
+        config.setJournalCount(journalCount);
+        config.setMaxAttributeChars(maxAttrChars);
+        config.setConcurrentMergeThreads(concurrentMergeThreads);
+
+        if (warmCacheFrequency != null && 
!warmCacheFrequency.trim().equals("")) {
+            config.setWarmCacheFrequencyMinutes((int) 
FormatUtils.getTimeDuration(warmCacheFrequency, TimeUnit.MINUTES));
+        }
+        if (shardSize != null) {
+            config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, 
DataUnit.B).longValue());
+        }
+
+        config.setAlwaysSync(alwaysSync);
+
+        return config;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index f018685..ce875d6 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -32,6 +32,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRecordReader extends CompressableRecordReader {
+    public static final int SERIALIZATION_VERISON = 9;
+    public static final String SERIALIZATION_NAME = 
"org.apache.nifi.provenance.PersistentProvenanceRepository";
+
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordReader.class);
     private static final Pattern UUID_PATTERN = 
Pattern.compile("[a-fA-F0-9]{8}\\-([a-fA-F0-9]{4}\\-){3}[a-fA-F0-9]{12}");
 
@@ -121,9 +124,9 @@ public class StandardRecordReader extends 
CompressableRecordReader {
 
     @Override
     public StandardProvenanceEventRecord nextRecord(final DataInputStream dis, 
final int serializationVersion) throws IOException {
-        if (serializationVersion > StandardRecordWriter.SERIALIZATION_VERISON) 
{
+        if (serializationVersion > SERIALIZATION_VERISON) {
             throw new IllegalArgumentException("Unable to deserialize record 
because the version is "
-                + serializationVersion + " and supported versions are 1-" + 
StandardRecordWriter.SERIALIZATION_VERISON);
+                + serializationVersion + " and supported versions are 1-" + 
SERIALIZATION_VERISON);
         }
 
         // Schema changed drastically in version 6 so we created a new method 
to handle old records

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index 4696767..0a749ad 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -16,17 +16,18 @@
  */
 package org.apache.nifi.provenance;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.stream.io.DataOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,15 +46,16 @@ public class StandardRecordWriter extends 
CompressableRecordWriter implements Re
     private final File file;
 
 
-    public StandardRecordWriter(final File file, final TocWriter writer, final 
boolean compressed, final int uncompressedBlockSize) throws IOException {
-        super(file, writer, compressed, uncompressedBlockSize);
+    public StandardRecordWriter(final File file, final AtomicLong idGenerator, 
final TocWriter writer, final boolean compressed, final int 
uncompressedBlockSize) throws IOException {
+        super(file, idGenerator, writer, compressed, uncompressedBlockSize);
         logger.trace("Creating Record Writer for {}", file.getName());
 
         this.file = file;
     }
 
-    public StandardRecordWriter(final OutputStream out, final TocWriter 
tocWriter, final boolean compressed, final int uncompressedBlockSize) throws 
IOException {
-        super(out, tocWriter, compressed, uncompressedBlockSize);
+    public StandardRecordWriter(final OutputStream out, final String 
storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter,
+        final boolean compressed, final int uncompressedBlockSize) throws 
IOException {
+        super(out, storageLocation, idGenerator, tocWriter, compressed, 
uncompressedBlockSize);
         this.file = null;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
new file mode 100644
index 0000000..229a96d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.authorization.UserEventAuthorizer;
+import org.apache.nifi.provenance.index.EventIndex;
+import org.apache.nifi.provenance.index.lucene.LuceneEventIndex;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lucene.IndexManager;
+import org.apache.nifi.provenance.lucene.SimpleIndexManager;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.serialization.RecordReaders;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.provenance.store.EventFileManager;
+import org.apache.nifi.provenance.store.EventStore;
+import org.apache.nifi.provenance.store.PartitionedWriteAheadEventStore;
+import org.apache.nifi.provenance.store.RecordReaderFactory;
+import org.apache.nifi.provenance.store.RecordWriterFactory;
+import org.apache.nifi.provenance.store.StorageResult;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.provenance.util.CloseableUtil;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * <p>
+ * A Provenance Repository that is made up of two distinct concepts: An {@link 
EventStore Event Store} that is responsible
+ * for storing and accessing the events (this repository makes use of an Event 
Store that uses a backing Write-Ahead Log, hence the name
+ * WriteAheadProvenanceRepository) and an {@link EventIndex Event Index} that 
is responsible for indexing and searching those
+ * events.
+ * </p>
+ *
+ * <p>
+ * When a Provenance Event is added to the repository, it is first stored in 
the Event Store. The Event Store reports the location (namely, the
+ * Event Identifier) that it used to store the event. The stored event is then 
given to the Event Index along with its storage location. The index
+ * is then responsible for indexing the event in real-time. Once this has 
completed, the method returns.
+ * </p>
+ *
+ * <p>
+ * The Event Index that is used by this implementation currently is the {@link 
LuceneEventIndex}, which is powered by Apache Lucene. This index provides
+ * very high throughput. However, this high throughput is gained by avoiding 
continual 'commits' of the Index Writer. As a result, on restart, this 
Repository
+ * may take a minute or two to re-index some of the Provenance Events, as some 
of the Events may have been added to the index without committing the Index 
Writer.
+ * Given the substantial performance improvement gained by committing the 
Index Writer only periodically, this trade-off is generally well accepted.
+ * </p>
+ *
+ * <p>
+ * This Repositories supports the notion of 'partitions'. The repository can 
be configured to store data to one or more partitions. Each partition is 
typically
+ * stored on a separate physical partition on disk. As a result, this allows 
striping of data across multiple partitions in order to achieve linear 
scalability
+ * across disks for far greater performance.
+ * </p>
+ */
+public class WriteAheadProvenanceRepository implements ProvenanceRepository {
+    private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadProvenanceRepository.class);
+    private static final int BLOCK_SIZE = 1024 * 32;
+    public static final String EVENT_CATEGORY = "Provenance Repository";
+
+    private final RepositoryConfiguration config;
+
+    // effectively final
+    private EventStore eventStore;
+    private EventIndex eventIndex;
+    private EventReporter eventReporter;
+    private Authorizer authorizer;
+    private ProvenanceAuthorizableFactory resourceFactory;
+
+    /**
+     * This constructor exists solely for the use of the Java Service Loader 
mechanism and should not be used.
+     */
+    public WriteAheadProvenanceRepository() {
+        config = null;
+    }
+
+    public WriteAheadProvenanceRepository(final NiFiProperties nifiProperties) 
{
+        this(RepositoryConfiguration.create(nifiProperties));
+    }
+
+    public WriteAheadProvenanceRepository(final RepositoryConfiguration 
config) {
+        this.config = config;
+    }
+
+    @Override
+    public synchronized void initialize(final EventReporter eventReporter, 
final Authorizer authorizer, final ProvenanceAuthorizableFactory 
resourceFactory,
+        final IdentifierLookup idLookup) throws IOException {
+        final RecordWriterFactory recordWriterFactory = (file, idGenerator, 
compressed, createToc) -> {
+            final TocWriter tocWriter = createToc ? new 
StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+            return new EventIdFirstSchemaRecordWriter(file, idGenerator, 
tocWriter, compressed, BLOCK_SIZE, idLookup);
+        };
+
+        final EventFileManager fileManager = new EventFileManager();
+        final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) 
-> {
+            fileManager.obtainReadLock(file);
+            try {
+                return RecordReaders.newRecordReader(file, logs, maxChars);
+            } finally {
+                fileManager.releaseReadLock(file);
+            }
+        };
+
+        eventStore = new PartitionedWriteAheadEventStore(config, 
recordWriterFactory, recordReaderFactory, eventReporter, fileManager);
+
+        final IndexManager indexManager = new SimpleIndexManager(config);
+        eventIndex = new LuceneEventIndex(config, indexManager, eventReporter);
+
+        this.eventReporter = eventReporter;
+        this.authorizer = authorizer;
+        this.resourceFactory = resourceFactory;
+
+        eventStore.initialize();
+        eventIndex.initialize(eventStore);
+
+        eventStore.reindexLatestEvents(eventIndex);
+    }
+
+    @Override
+    public ProvenanceEventBuilder eventBuilder() {
+        return new StandardProvenanceEventRecord.Builder();
+    }
+
+    @Override
+    public void registerEvent(final ProvenanceEventRecord event) {
+        registerEvents(Collections.singleton(event));
+    }
+
+    @Override
+    public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
+        final StorageResult storageResult;
+
+        try {
+            storageResult = eventStore.addEvents(events);
+        } catch (final IOException e) {
+            logger.error("Failed to write events to the Event Store", e);
+            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed 
to write Provenance Events to the repository. See logs for more details.");
+            return;
+        }
+
+        final Map<ProvenanceEventRecord, StorageSummary> locationMap = 
storageResult.getStorageLocations();
+        if (!locationMap.isEmpty()) {
+            eventIndex.addEvents(locationMap);
+        }
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, 
final int maxRecords) throws IOException {
+        return eventStore.getEvents(firstRecordId, maxRecords);
+    }
+
+    @Override
+    public ProvenanceEventRecord getEvent(final long id) throws IOException {
+        return eventStore.getEvent(id).orElse(null);
+    }
+
+    @Override
+    public Long getMaxEventId() {
+        return eventStore.getMaxEventId();
+    }
+
+    @Override
+    public void close() {
+        CloseableUtil.closeQuietly(eventStore, eventIndex);
+    }
+
+    @Override
+    public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) 
throws IOException {
+        final ProvenanceEventRecord event = getEvent(id);
+        if (event == null) {
+            return null;
+        }
+
+        authorize(event, user);
+        return event;
+    }
+
+    private void authorize(final ProvenanceEventRecord event, final NiFiUser 
user) {
+        if (authorizer == null) {
+            return;
+        }
+
+        final Authorizable eventAuthorizable;
+        if (event.isRemotePortType()) {
+            eventAuthorizable = 
resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
+        } else {
+            eventAuthorizable = 
resourceFactory.createLocalDataAuthorizable(event.getComponentId());
+        }
+        eventAuthorizable.authorize(authorizer, RequestAction.READ, user, 
event.getAttributes());
+    }
+
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, 
final int maxRecords, final NiFiUser user) throws IOException {
+        final List<ProvenanceEventRecord> events = getEvents(firstRecordId, 
maxRecords);
+        return createEventAuthorizer(user).filterUnauthorizedEvents(events);
+    }
+
+    private EventAuthorizer createEventAuthorizer(final NiFiUser user) {
+        return new UserEventAuthorizer(authorizer, resourceFactory, user);
+    }
+
+    @Override
+    public ProvenanceEventRepository getProvenanceEventRepository() {
+        return this;
+    }
+
+    @Override
+    public QuerySubmission submitQuery(final Query query, final NiFiUser user) 
{
+        return eventIndex.submitQuery(query, createEventAuthorizer(user), 
user.getIdentity());
+    }
+
+    @Override
+    public QuerySubmission retrieveQuerySubmission(final String 
queryIdentifier, final NiFiUser user) {
+        return eventIndex.retrieveQuerySubmission(queryIdentifier, user);
+    }
+
+    @Override
+    public ComputeLineageSubmission submitLineageComputation(final String 
flowFileUuid, final NiFiUser user) {
+        return eventIndex.submitLineageComputation(flowFileUuid, user, 
createEventAuthorizer(user));
+    }
+
+    @Override
+    public ComputeLineageSubmission submitLineageComputation(final long 
eventId, final NiFiUser user) {
+        return eventIndex.submitLineageComputation(eventId, user, 
createEventAuthorizer(user));
+    }
+
+    @Override
+    public ComputeLineageSubmission retrieveLineageSubmission(final String 
lineageIdentifier, final NiFiUser user) {
+        return eventIndex.retrieveLineageSubmission(lineageIdentifier, user);
+    }
+
+    @Override
+    public ComputeLineageSubmission submitExpandParents(final long eventId, 
final NiFiUser user) {
+        return eventIndex.submitExpandParents(eventId, user, 
createEventAuthorizer(user));
+    }
+
+    @Override
+    public ComputeLineageSubmission submitExpandChildren(final long eventId, 
final NiFiUser user) {
+        return eventIndex.submitExpandChildren(eventId, user, 
createEventAuthorizer(user));
+    }
+
+    @Override
+    public List<SearchableField> getSearchableFields() {
+        return Collections.unmodifiableList(config.getSearchableFields());
+    }
+
+    @Override
+    public List<SearchableField> getSearchableAttributes() {
+        return Collections.unmodifiableList(config.getSearchableAttributes());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/EventAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/EventAuthorizer.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/EventAuthorizer.java
new file mode 100644
index 0000000..ab193e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/EventAuthorizer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.authorization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.provenance.PlaceholderProvenanceEvent;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface EventAuthorizer {
+
+    /**
+     * Determines whether or not the has access to the given Provenance Event.
+     * This method does not imply the user is directly attempting to access 
the specified resource. If the user is
+     * attempting a direct access use Authorizable.authorize().
+     *
+     * @param event the event to authorize
+     * @return is authorized
+     */
+    boolean isAuthorized(ProvenanceEventRecord event);
+
+    /**
+     * Authorizes the current user for the specified action on the specified 
resource. This method does
+     * imply the user is directly accessing the specified resource.
+     *
+     * @param event the event to authorize
+     * @throws AccessDeniedException if the user is not authorized
+     */
+    void authorize(ProvenanceEventRecord event) throws AccessDeniedException;
+
+    /**
+     * Filters out any events that the user is not authorized to access
+     *
+     * @param events the events to filtered
+     * @return a List that contains only events from the original, for which 
the user has access
+     */
+    default List<ProvenanceEventRecord> 
filterUnauthorizedEvents(List<ProvenanceEventRecord> events) {
+        return events.stream()
+            .filter(event -> isAuthorized(event))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * Returns a Set of provenance events for which any of the given events 
that the user does not
+     * have access to has been replaced by a placeholder event
+     *
+     * @param events the events to filter
+     * @return a Set containing only provenance events that the user has 
access to
+     */
+    default Set<ProvenanceEventRecord> 
replaceUnauthorizedWithPlaceholders(Set<ProvenanceEventRecord> events) {
+        return events.stream()
+            .map(event -> isAuthorized(event) ? event : new 
PlaceholderProvenanceEvent(event))
+            .collect(Collectors.toSet());
+    }
+
+    public static final EventAuthorizer GRANT_ALL = new EventAuthorizer() {
+        @Override
+        public boolean isAuthorized(ProvenanceEventRecord event) {
+            return true;
+        }
+
+        @Override
+        public void authorize(ProvenanceEventRecord event) throws 
AccessDeniedException {
+        }
+
+        @Override
+        public List<ProvenanceEventRecord> 
filterUnauthorizedEvents(List<ProvenanceEventRecord> events) {
+            return events;
+        }
+
+        @Override
+        public Set<ProvenanceEventRecord> 
replaceUnauthorizedWithPlaceholders(Set<ProvenanceEventRecord> events) {
+            return events;
+        }
+    };
+
+    public static final EventAuthorizer DENY_ALL = new EventAuthorizer() {
+        @Override
+        public boolean isAuthorized(ProvenanceEventRecord event) {
+            return false;
+        }
+
+        @Override
+        public void authorize(ProvenanceEventRecord event) throws 
AccessDeniedException {
+            throw new AccessDeniedException();
+        }
+
+        @Override
+        public List<ProvenanceEventRecord> 
filterUnauthorizedEvents(List<ProvenanceEventRecord> events) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public Set<ProvenanceEventRecord> 
replaceUnauthorizedWithPlaceholders(Set<ProvenanceEventRecord> events) {
+            return events.stream()
+                .map(event -> new PlaceholderProvenanceEvent(event))
+                .collect(Collectors.toSet());
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/EventTransformer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/EventTransformer.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/EventTransformer.java
new file mode 100644
index 0000000..1c48aad
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/EventTransformer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.authorization;
+
+import java.util.Optional;
+
+import org.apache.nifi.provenance.PlaceholderProvenanceEvent;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+/**
+ * An interface for transforming a Provenance Event for which the user is not 
authorized to access
+ */
+public interface EventTransformer {
+
+    Optional<ProvenanceEventRecord> transform(ProvenanceEventRecord 
unauthorizedEvent);
+
+    /**
+     * An EventTransformer that transforms any event into an Empty Optional
+     */
+    public static final EventTransformer EMPTY_TRANSFORMER = event -> 
Optional.empty();
+
+    /**
+     * An EventTransformer that transforms any event into a Placeholder event
+     */
+    public static final EventTransformer PLACEHOLDER_TRANSFORMER = event -> 
Optional.of(new PlaceholderProvenanceEvent(event));
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/UserEventAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/UserEventAuthorizer.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/UserEventAuthorizer.java
new file mode 100644
index 0000000..5126b7e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/authorization/UserEventAuthorizer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.authorization;
+
+import org.apache.nifi.authorization.AuthorizationResult;
+import org.apache.nifi.authorization.AuthorizationResult.Result;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.web.ResourceNotFoundException;
+
+public class UserEventAuthorizer implements EventAuthorizer {
+    private final Authorizer authorizer;
+    private final ProvenanceAuthorizableFactory resourceFactory;
+    private final NiFiUser user;
+
+    public UserEventAuthorizer(final Authorizer authorizer, final 
ProvenanceAuthorizableFactory authorizableFactory, final NiFiUser user) {
+        this.authorizer = authorizer;
+        this.resourceFactory = authorizableFactory;
+        this.user = user;
+    }
+
+    @Override
+    public boolean isAuthorized(final ProvenanceEventRecord event) {
+        if (authorizer == null || user == null) {
+            return true;
+        }
+
+        final Authorizable eventAuthorizable;
+        try {
+            if (event.isRemotePortType()) {
+                eventAuthorizable = 
resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
+            } else {
+                eventAuthorizable = 
resourceFactory.createLocalDataAuthorizable(event.getComponentId());
+            }
+        } catch (final ResourceNotFoundException rnfe) {
+            return false;
+        }
+
+        final AuthorizationResult result = 
eventAuthorizable.checkAuthorization(authorizer, RequestAction.READ, user, 
event.getAttributes());
+        return Result.Approved.equals(result.getResult());
+    }
+
+    @Override
+    public void authorize(final ProvenanceEventRecord event) {
+        if (authorizer == null) {
+            return;
+        }
+
+        final Authorizable eventAuthorizable;
+        if (event.isRemotePortType()) {
+            eventAuthorizable = 
resourceFactory.createRemoteDataAuthorizable(event.getComponentId());
+        } else {
+            eventAuthorizable = 
resourceFactory.createLocalDataAuthorizable(event.getComponentId());
+        }
+        eventAuthorizable.authorize(authorizer, RequestAction.READ, user, 
event.getAttributes());
+    }
+}

Reply via email to