http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStore.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStore.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStore.java
new file mode 100644
index 0000000..ba4acea
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStore.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.authorization.EventTransformer;
+import org.apache.nifi.provenance.index.EventIndex;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+
+/**
+ * <p>
+ * An Event Store is responsible for storing Provenance Events and retrieving 
them at a later time.
+ * </p>
+ */
+public interface EventStore extends Closeable {
+
+    /**
+     * Performs any initialization routines that need to happen before the 
store is used
+     *
+     * @throws IOException if unable to perform initialization
+     */
+    void initialize() throws IOException;
+
+    /**
+     * Adds the given events to the store. All events will be written to the 
same Storage Location.
+     * I.e., all of the {@link StorageSummary} objects that are provided when 
calling the {@link StorageResult#getStorageLocations()}
+     * method will return the same value for the {@link 
StorageSummary#getStorageLocation()}. Each one, however, will
+     * have a different Event ID and potentially a different Block Index.
+     *
+     * @param events the events to add
+     * @return a mapping of event to the location where it was stored
+     * @throws IOException if unable to add the events
+     */
+    StorageResult addEvents(Iterable<ProvenanceEventRecord> events) throws 
IOException;
+
+    /**
+     * @return the number of bytes occupied by the events in the store
+     * @throws IOException if unable to determine the size of the store
+     */
+    long getSize() throws IOException;
+
+    /**
+     * @return the largest Event ID that has been written to this store, or -1 
if no events have yet been stored.
+     */
+    long getMaxEventId();
+
+    /**
+     * Retrieves the event with the given ID
+     *
+     * @param id the ID of the event to retrieve
+     * @return an Optional containing the Event with the given ID, or an empty 
optional if the event cannot be found
+     * @throws IOException if unable to read the event from storage
+     */
+    Optional<ProvenanceEventRecord> getEvent(long id) throws IOException;
+
+    /**
+     * Retrieves up to maxRecords events from the store, starting with the 
event whose ID is equal to firstRecordId. If that
+     * event cannot be found, then the first event will be the oldest event in 
the store whose ID is greater than firstRecordId.
+     * All events will be returned in the order that they were written to the 
store. I.e., all events will have monotonically
+     * increasing Event ID's. No events will be filtered out, since there is 
no EventAuthorizer provided.
+     *
+     * @param firstRecordId the ID of the first event to retrieve
+     * @param maxRecords the maximum number of records to retrieve. The actual 
number of results returned may be less than this.
+     * @return a List of ProvenanceEventRecord's
+     * @throws IOException if unable to retrieve records from the store
+     */
+    List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords) 
throws IOException;
+
+    /**
+     * Retrieves up to maxRecords events from the store, starting with the 
event whose ID is equal to firstRecordId. If that
+     * event cannot be found, then the first event will be the oldest event in 
the store whose ID is greater than firstRecordId.
+     * All events will be returned in the order that they were written to the 
store. I.e., all events will have monotonically
+     * increasing Event ID's.
+     *
+     * @param firstRecordId the ID of the first event to retrieve
+     * @param maxRecords the maximum number of records to retrieve. The actual 
number of results returned may be less than this.
+     * @param authorizer the authorizer that should be used to filter out any 
events that the user doesn't have access to
+     * @param unauthorizedTransformer the transformer to apply to unauthorized 
events
+     * @return a List of ProvenanceEventRecord's
+     * @throws IOException if unable to retrieve records from the store
+     */
+    List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords, 
EventAuthorizer authorizer, EventTransformer unauthorizedTransformer) throws 
IOException;
+
+    /**
+     * Given a List of Event ID's, returns a List of Provenance Events that 
contain the events that have those corresponding
+     * Event ID's. If any events cannot be found, a warning will be logged but 
no Exception will be thrown.
+     *
+     * @param eventIds a Stream of Event ID's
+     * @param authorizer the authorizer that should be used to filter out any 
events that the user doesn't have access to
+     * @param unauthorizedTransformer the transformer to apply to unauthorized 
events
+     * @return a List of events that correspond to the given ID's
+     * @throws IOException if unable to retrieve records from the store
+     */
+    List<ProvenanceEventRecord> getEvents(List<Long> eventIds, EventAuthorizer 
authorizer, EventTransformer unauthorizedTransformer) throws IOException;
+
+    /**
+     * Causes the latest events in this store to be re-indexed by the given 
Event Index
+     *
+     * @param eventIndex the EventIndex to use for indexing events
+     */
+    void reindexLatestEvents(EventIndex eventIndex);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStorePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStorePartition.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStorePartition.java
new file mode 100644
index 0000000..ccb94f8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventStorePartition.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.store.iterator.EventIterator;
+
+public interface EventStorePartition extends Closeable {
+    /**
+     * Performs any initialization routines that need to happen before the 
store is used
+     *
+     * @throws IOException if unable to perform initialization
+     */
+    void initialize() throws IOException;
+
+    /**
+     * Adds the given events to the store
+     *
+     * @param events the events to add
+     * @return a mapping of event to the location where it was stored
+     * @throws IOException if unable to add the events
+     */
+    StorageResult addEvents(Iterable<ProvenanceEventRecord> events) throws 
IOException;
+
+    /**
+     * @return the number of bytes occupied by the events in the store
+     */
+    long getSize();
+
+    /**
+     * @return the largest Event ID that has been written to this store, or -1 
if no events have yet been stored.
+     */
+    long getMaxEventId();
+
+    /**
+     * Retrieves the event with the given ID
+     *
+     * @param id the ID of the event to retrieve
+     * @return the Event with the given ID, or <code>null</code> if the event 
cannot be found
+     * @throws IOException if unable to read the event from storage
+     */
+    Optional<ProvenanceEventRecord> getEvent(long id) throws IOException;
+
+    /**
+     * Retrieves up to maxRecords events from the store, starting with the 
event whose ID is equal to firstRecordId. If that
+     * event cannot be found, then the first event will be the oldest event in 
the store whose ID is greater than firstRecordId.
+     * All events will be returned in the order that they were written to the 
store. I.e., all events will have monotonically
+     * increasing Event ID's.
+     *
+     * @param firstRecordId the ID of the first event to retrieve
+     * @param maxEvents the maximum number of events to retrieve. The actual 
number of results returned may be less than this.
+     * @param authorizer the authorizer that should be used to filter out any 
events that the user doesn't have access to
+     * @return a List of ProvenanceEventRecord's
+     * @throws IOException if unable to retrieve records from the store
+     */
+    List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxEvents, 
EventAuthorizer authorizer) throws IOException;
+
+    /**
+     * Returns an {@link EventIterator} that is capable of iterating over the 
events in the store beginning with the given
+     * record id. The events returned by the EventIterator will be provided in 
the order in which they were stored in the
+     * partition. All events retrieved from this EventIterator will have 
monotonically increasing Event ID's.
+     *
+     * @param minimumEventId the minimum value of any Event ID that should be 
returned
+     * @return an EventIterator that is capable of iterating over events in 
the store
+     */
+    EventIterator createEventIterator(long minimumEventId);
+
+    /**
+     * Returns an {@link EventIterator} that iterates over the given event 
ID's and returns one ProvenanceEventRecord for
+     * each given, if the ID given can be found. If a given ID cannot be 
found, it will be skipped and no error will be reported.
+     *
+     * @param eventIds the ID's of the events to retrieve
+     * @return an EventIterator that iterates over the given event ID's
+     */
+    EventIterator createEventIterator(List<Long> eventIds);
+
+    /**
+     * Purges any events from the partition that are older than the given 
amount of time
+     *
+     * @param olderThan the amount of time for which any event older than this 
should be removed
+     * @param timeUnit the unit of time that applies to the first argument
+     */
+    void purgeOldEvents(long olderThan, TimeUnit timeUnit);
+
+    /**
+     * Purges some number of events from the partition. The oldest events will 
be purged.
+     *
+     * @return the number of bytes purged from the partition
+     */
+    long purgeOldestEvents();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java
new file mode 100644
index 0000000..5f922dd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.authorization.EventTransformer;
+import org.apache.nifi.provenance.store.iterator.AuthorizingEventIterator;
+import org.apache.nifi.provenance.store.iterator.EventIterator;
+import org.apache.nifi.provenance.util.DirectoryUtils;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class PartitionedEventStore implements EventStore {
+    private static final Logger logger = 
LoggerFactory.getLogger(PartitionedEventStore.class);
+    private static final String EVENT_CATEGORY = "Provenance Repository";
+
+    private final AtomicLong partitionIndex = new AtomicLong(0L);
+    private final RepositoryConfiguration repoConfig;
+    private final EventReporter eventReporter;
+    private ScheduledExecutorService maintenanceExecutor;
+
+    public PartitionedEventStore(final RepositoryConfiguration config, final 
EventReporter eventReporter) {
+        this.repoConfig = config;
+        this.eventReporter = eventReporter;
+    }
+
+
+    @Override
+    public void initialize() throws IOException {
+        maintenanceExecutor = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Provenance Repository Maintenance"));
+        maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 
1, 1, TimeUnit.MINUTES);
+
+        for (final EventStorePartition partition : getPartitions()) {
+            partition.initialize();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (maintenanceExecutor != null) {
+            maintenanceExecutor.shutdownNow();
+        }
+
+        IOException thrown = null;
+
+        for (final EventStorePartition partition : getPartitions()) {
+            try {
+               partition.close();
+            } catch (final IOException ioe) {
+                if (thrown == null) {
+                    thrown = ioe;
+                } else {
+                    thrown.addSuppressed(ioe);
+                }
+            }
+        }
+
+        if (thrown != null) {
+            throw thrown;
+        }
+    }
+
+
+    @Override
+    public StorageResult addEvents(final Iterable<ProvenanceEventRecord> 
events) throws IOException {
+        final List<? extends EventStorePartition> partitions = getPartitions();
+        final int index = (int) (partitionIndex.getAndIncrement() % 
partitions.size());
+        final EventStorePartition partition = partitions.get(index);
+        return partition.addEvents(events);
+    }
+
+    @Override
+    public long getSize() {
+        long size = 0;
+        for (final EventStorePartition partition : getPartitions()) {
+            size += partition.getSize();
+        }
+
+        return size;
+    }
+
+    private long getRepoSize() {
+        long total = 0L;
+
+        for (final File storageDir : 
repoConfig.getStorageDirectories().values()) {
+            total += DirectoryUtils.getSize(storageDir);
+        }
+
+        return total;
+    }
+
+    @Override
+    public long getMaxEventId() {
+        return getPartitions().stream()
+            .mapToLong(part -> part.getMaxEventId())
+            .max()
+            .orElse(-1L);
+    }
+
+    @Override
+    public Optional<ProvenanceEventRecord> getEvent(final long id) throws 
IOException {
+        for (final EventStorePartition partition : getPartitions()) {
+            final Optional<ProvenanceEventRecord> option = 
partition.getEvent(id);
+            if (option.isPresent()) {
+                return option;
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, 
final int maxRecords) throws IOException {
+        return getEvents(firstRecordId, maxRecords, EventAuthorizer.GRANT_ALL, 
EventTransformer.EMPTY_TRANSFORMER);
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, 
final int maxRecords, final EventAuthorizer authorizer,
+        final EventTransformer transformer) throws IOException {
+        if (firstRecordId + maxRecords < 1 || maxRecords < 1 || firstRecordId 
> getMaxEventId()) {
+            return Collections.emptyList();
+        }
+
+        return getEvents(maxRecords, authorizer, part -> 
part.createEventIterator(firstRecordId), transformer);
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final List<Long> eventIds, 
final EventAuthorizer authorizer, final EventTransformer transformer) throws 
IOException {
+        if (eventIds == null || eventIds.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        return getEvents(eventIds.size(), authorizer, part -> 
part.createEventIterator(eventIds), transformer);
+    }
+
+    private List<ProvenanceEventRecord> getEvents(final int maxRecords, final 
EventAuthorizer authorizer,
+        final Function<EventStorePartition, EventIterator> 
eventIteratorFactory, final EventTransformer transformer) throws IOException {
+
+        if (maxRecords < 1) {
+            return Collections.emptyList();
+        }
+
+        final List<ProvenanceEventRecord> selectedEvents = new ArrayList<>();
+
+        // Create a Map so that the key is the next record available from a 
partition and the value is the EventIterator from which
+        // the record came. This sorted map is then used so that we are able 
to always get the first entry, which is the next
+        // lowest record id among all partitions.
+        final SortedMap<ProvenanceEventRecord, EventIterator> 
recordToIteratorMap = new TreeMap<>(
+            (o1, o2) -> Long.compare(o1.getEventId(), o2.getEventId()));
+
+        try {
+            // Seed our map with the first event in each Partition.
+            for (final EventStorePartition partition : getPartitions()) {
+                final EventAuthorizer nonNullAuthorizer = authorizer == null ? 
EventAuthorizer.GRANT_ALL : authorizer;
+                final EventIterator partitionIterator = 
eventIteratorFactory.apply(partition);
+                final EventIterator iterator = new 
AuthorizingEventIterator(partitionIterator, nonNullAuthorizer, transformer);
+
+                final Optional<ProvenanceEventRecord> option = 
iterator.nextEvent();
+                if (option.isPresent()) {
+                    recordToIteratorMap.put(option.get(), iterator);
+                }
+            }
+
+            // If no records found, just return the empty list.
+            if (recordToIteratorMap.isEmpty()) {
+                return selectedEvents;
+            }
+
+            // Get the event with the next-lowest ID. Add it to the list of 
selected events,
+            // then read the next event from the same EventIterator that this 
event came from.
+            // This ensures that our map is always populated with the next 
event for each
+            // EventIterator, which also ensures that the first key in our map 
is the event
+            // with the lowest ID (since all events from a given EventIterator 
have monotonically
+            // increasing Event ID's).
+            ProvenanceEventRecord nextEvent = recordToIteratorMap.firstKey();
+            while (nextEvent != null && selectedEvents.size() < maxRecords) {
+                selectedEvents.add(nextEvent);
+
+                final EventIterator iterator = 
recordToIteratorMap.remove(nextEvent);
+                final Optional<ProvenanceEventRecord> nextRecordFromIterator = 
iterator.nextEvent();
+                if (nextRecordFromIterator.isPresent()) {
+                    recordToIteratorMap.put(nextRecordFromIterator.get(), 
iterator);
+                }
+
+                nextEvent = recordToIteratorMap.isEmpty() ? null : 
recordToIteratorMap.firstKey();
+            }
+
+            return selectedEvents;
+        } finally {
+            // Ensure that we close all record readers that have been created
+            for (final EventIterator iterator : recordToIteratorMap.values()) {
+                try {
+                    iterator.close();
+                } catch (final Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        logger.warn("Failed to close Record Reader {}", 
iterator, e);
+                    } else {
+                        logger.warn("Failed to close Record Reader {}", 
iterator);
+                    }
+                }
+            }
+        }
+    }
+
+
+    void performMaintenance() {
+        try {
+            final long maxFileLife = 
repoConfig.getMaxRecordLife(TimeUnit.MILLISECONDS);
+            for (final EventStorePartition partition : getPartitions()) {
+                try {
+                    partition.purgeOldEvents(maxFileLife, 
TimeUnit.MILLISECONDS);
+                } catch (final Exception e) {
+                    logger.error("Failed to purge expired events from " + 
partition, e);
+                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY,
+                        "Failed to purge expired events from Provenance 
Repository. See logs for more information.");
+                }
+            }
+
+            final long maxStorageCapacity = repoConfig.getMaxStorageCapacity();
+            long currentSize;
+            try {
+                currentSize = getRepoSize();
+            } catch (final Exception e) {
+                logger.error("Could not determine size of Provenance 
Repository. Will not expire any data due to storage limits", e);
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, 
"Failed to determine size of Provenance Repository. "
+                    + "No data will be expired due to storage limits at this 
time. See logs for more information.");
+                return;
+            }
+
+            while (currentSize > maxStorageCapacity) {
+                for (final EventStorePartition partition : getPartitions()) {
+                    try {
+                        final long removed = partition.purgeOldestEvents();
+                        currentSize -= removed;
+                    } catch (final Exception e) {
+                        logger.error("Failed to purge oldest events from " + 
partition, e);
+                        eventReporter.reportEvent(Severity.WARNING, 
EVENT_CATEGORY,
+                            "Failed to purge oldest events from Provenance 
Repository. See logs for more information.");
+                    }
+                }
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to perform periodic maintenance", e);
+            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY,
+                "Failed to perform periodic maintenace for Provenance 
Repository. See logs for more information.");
+        }
+    }
+
+    protected abstract List<? extends EventStorePartition> getPartitions();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java
new file mode 100644
index 0000000..14de80e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.index.EventIndex;
+import org.apache.nifi.provenance.serialization.EventFileCompressor;
+
+public class PartitionedWriteAheadEventStore extends PartitionedEventStore {
+    private final BlockingQueue<File> filesToCompress;
+    private final List<WriteAheadStorePartition> partitions;
+    private final RepositoryConfiguration repoConfig;
+
+    private final ExecutorService compressionExecutor;
+    private final List<EventFileCompressor> fileCompressors = 
Collections.synchronizedList(new ArrayList<>());
+    private final EventReporter eventReporter;
+    private final EventFileManager fileManager;
+
+    public PartitionedWriteAheadEventStore(final RepositoryConfiguration 
repoConfig, final RecordWriterFactory recordWriterFactory,
+        final RecordReaderFactory recordReaderFactory, final EventReporter 
eventReporter, final EventFileManager fileManager) {
+        super(repoConfig, eventReporter);
+        this.repoConfig = repoConfig;
+        this.eventReporter = eventReporter;
+        this.filesToCompress = new LinkedBlockingQueue<>(100);
+        final AtomicLong idGenerator = new AtomicLong(0L);
+        this.partitions = createPartitions(repoConfig, recordWriterFactory, 
recordReaderFactory, idGenerator);
+        this.fileManager = fileManager;
+
+        // Creates tasks to compress data on rollover
+        if (repoConfig.isCompressOnRollover()) {
+            compressionExecutor = 
Executors.newFixedThreadPool(repoConfig.getIndexThreadPoolSize(), new 
NamedThreadFactory("Compress Provenance Logs"));
+        } else {
+            compressionExecutor = null;
+        }
+    }
+
+    private List<WriteAheadStorePartition> createPartitions(final 
RepositoryConfiguration repoConfig, final RecordWriterFactory 
recordWriterFactory,
+        final RecordReaderFactory recordReaderFactory, final AtomicLong 
idGenerator) {
+        final Map<String, File> storageDirectories = 
repoConfig.getStorageDirectories();
+        final List<WriteAheadStorePartition> partitions = new 
ArrayList<>(storageDirectories.size());
+
+        for (final Map.Entry<String, File> entry : 
storageDirectories.entrySet()) {
+            // Need to ensure that the same partition directory always gets 
the same partition index.
+            // If we don't, then we will end up re-indexing the events from 1 
index into another index, and
+            // this will result in a lot of duplicates (up to a million per 
index per restart). This is the reason
+            // that we use a partition name here based on the properties file.
+            final String partitionName = entry.getKey();
+            final File storageDirectory = entry.getValue();
+            partitions.add(new WriteAheadStorePartition(storageDirectory, 
partitionName, repoConfig,
+                recordWriterFactory, recordReaderFactory, filesToCompress, 
idGenerator, eventReporter));
+        }
+
+        return partitions;
+    }
+
+    @Override
+    public void initialize() throws IOException {
+        if (repoConfig.isCompressOnRollover()) {
+            for (int i = 0; i < repoConfig.getIndexThreadPoolSize(); i++) {
+                final EventFileCompressor compressor = new 
EventFileCompressor(filesToCompress, fileManager);
+                compressionExecutor.submit(compressor);
+                fileCompressors.add(compressor);
+            }
+        }
+
+        super.initialize();
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+
+        for (final EventFileCompressor compressor : fileCompressors) {
+            compressor.shutdown();
+        }
+
+        if (compressionExecutor != null) {
+            compressionExecutor.shutdown();
+        }
+    }
+
+    @Override
+    public void reindexLatestEvents(final EventIndex eventIndex) {
+        final List<WriteAheadStorePartition> partitions = getPartitions();
+        final int numPartitions = partitions.size();
+
+        final List<Future<?>> futures = new ArrayList<>(numPartitions);
+        final ExecutorService executor = 
Executors.newFixedThreadPool(numPartitions);
+
+        for (final WriteAheadStorePartition partition : partitions) {
+            futures.add(executor.submit(() -> 
partition.reindexLatestEvents(eventIndex)));
+        }
+
+        executor.shutdown();
+        for (final Future<?> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Failed to re-index events because 
Thread was interrupted", e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException("Failed to re-index events", e);
+            }
+        }
+    }
+
+    @Override
+    protected List<WriteAheadStorePartition> getPartitions() {
+        return partitions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordReaderFactory.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordReaderFactory.java
new file mode 100644
index 0000000..ddb8165
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordReaderFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+
+import org.apache.nifi.provenance.serialization.RecordReader;
+
+public interface RecordReaderFactory {
+    RecordReader newRecordReader(File file, Collection<Path> 
provenanceLogFiles, int maxAttributeChars) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterFactory.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterFactory.java
new file mode 100644
index 0000000..89da603
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.provenance.serialization.RecordWriter;
+
+public interface RecordWriterFactory {
+    RecordWriter createWriter(final File file, final AtomicLong idGenerator, 
final boolean compressed, final boolean createToc) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java
new file mode 100644
index 0000000..8543d2b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/RecordWriterLease.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecordWriterLease {
+    private final Logger logger = 
LoggerFactory.getLogger(RecordWriterLease.class);
+
+    private final RecordWriter writer;
+    private final long maxBytes;
+    private final int maxEvents;
+    private long usageCounter;
+    private boolean markedRollable = false;
+    private boolean closed = false;
+
+    public RecordWriterLease(final RecordWriter writer, final long maxBytes) {
+        this(writer, maxBytes, Integer.MAX_VALUE);
+    }
+
+    public RecordWriterLease(final RecordWriter writer, final long maxBytes, 
final int maxEvents) {
+        this.writer = writer;
+        this.maxBytes = maxBytes;
+        this.maxEvents = maxEvents;
+    }
+
+    public RecordWriter getWriter() {
+        return writer;
+    }
+
+    public synchronized boolean tryClaim() {
+        if (markedRollable || writer.isClosed() || writer.isDirty() || 
writer.getBytesWritten() >= maxBytes || writer.getRecordsWritten() >= 
maxEvents) {
+            return false;
+        }
+
+        usageCounter++;
+        return true;
+    }
+
+    public synchronized void relinquishClaim() {
+        usageCounter--;
+
+        if (closed && usageCounter < 1) {
+            try {
+                writer.close();
+            } catch (final Exception e) {
+                logger.warn("Failed to close " + writer, e);
+            }
+        }
+    }
+
+    public synchronized boolean shouldRoll() {
+        if (markedRollable) {
+            return true;
+        }
+
+        if (usageCounter < 1 && (writer.isClosed() || writer.isDirty() || 
writer.getBytesWritten() >= maxBytes || writer.getRecordsWritten() >= 
maxEvents)) {
+            markedRollable = true;
+            return true;
+        }
+
+        return false;
+    }
+
+    public synchronized void close() {
+        closed = true;
+
+        if (usageCounter < 1) {
+            try {
+                writer.close();
+            } catch (final Exception e) {
+                logger.warn("Failed to close " + writer, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/StorageResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/StorageResult.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/StorageResult.java
new file mode 100644
index 0000000..94b1ece
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/StorageResult.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+
+public interface StorageResult {
+    /**
+     * @return a map of each Provenance Event Record to the location where it 
was stored
+     */
+    Map<ProvenanceEventRecord, StorageSummary> getStorageLocations();
+
+    /**
+     * Indicates whether or not the storage of events triggered the store to 
roll over
+     * the storage location that it is storing data to
+     *
+     * @return <code>true</code> if the store rolled over to a new storage 
location, <code>false</code> otherwise
+     */
+    boolean triggeredRollover();
+
+    /**
+     * @return the number of events that were stored in the storage location 
that was rolled over, or
+     *         <code>null</code> if no storage locations were rolled over.
+     */
+    Integer getEventsRolledOver();
+
+    public static StorageResult EMPTY = new StorageResult() {
+        @Override
+        public Map<ProvenanceEventRecord, StorageSummary> 
getStorageLocations() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public boolean triggeredRollover() {
+            return false;
+        }
+
+        @Override
+        public Integer getEventsRolledOver() {
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return "StorageResult.EMPTY";
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
new file mode 100644
index 0000000..a25043a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.index.EventIndex;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.provenance.store.iterator.EventIterator;
+import 
org.apache.nifi.provenance.store.iterator.SelectiveRecordReaderEventIterator;
+import 
org.apache.nifi.provenance.store.iterator.SequentialRecordReaderEventIterator;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.util.DirectoryUtils;
+import org.apache.nifi.provenance.util.NamedThreadFactory;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WriteAheadStorePartition implements EventStorePartition {
+    private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadStorePartition.class);
+
+
+    private final RepositoryConfiguration config;
+    private final File partitionDirectory;
+    private final String partitionName;
+    private final RecordWriterFactory recordWriterFactory;
+    private final RecordReaderFactory recordReaderFactory;
+    private final BlockingQueue<File> filesToCompress;
+    private final AtomicLong idGenerator;
+    private final AtomicLong maxEventId = new AtomicLong(-1L);
+    private volatile boolean closed = false;
+
+    private AtomicReference<RecordWriterLease> eventWriterLeaseRef = new 
AtomicReference<>();
+
+    private final SortedMap<Long, File> minEventIdToPathMap = new TreeMap<>(); 
 // guarded by synchronizing on object
+
+    public WriteAheadStorePartition(final File storageDirectory, final String 
partitionName, final RepositoryConfiguration repoConfig, final 
RecordWriterFactory recordWriterFactory,
+        final RecordReaderFactory recordReaderFactory, final 
BlockingQueue<File> filesToCompress, final AtomicLong idGenerator, final 
EventReporter eventReporter) {
+
+        this.partitionName = partitionName;
+        this.config = repoConfig;
+        this.idGenerator = idGenerator;
+        this.partitionDirectory = storageDirectory;
+        this.recordWriterFactory = recordWriterFactory;
+        this.recordReaderFactory = recordReaderFactory;
+        this.filesToCompress = filesToCompress;
+    }
+
+    @Override
+    public void close() throws IOException {
+        closed = true;
+
+        final RecordWriterLease lease = eventWriterLeaseRef.get();
+        if (lease != null) {
+            lease.close();
+        }
+    }
+
+    @Override
+    public synchronized void initialize() throws IOException {
+        if (!partitionDirectory.exists()) {
+            Files.createDirectories(partitionDirectory.toPath());
+        }
+
+        final File[] files = 
partitionDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER);
+        if (files == null) {
+            throw new IOException("Could not access files in the " + 
partitionDirectory + " directory");
+        }
+
+        // We need to determine what the largest Event ID is in this 
partition. To do this, we
+        // iterate over all files starting with the file that has the greatest 
ID, and try to find
+        // the largest Event ID in that file. Once we successfully determine 
the greatest Event ID
+        // in any one of the files, we are done, since we are iterating over 
the files in order of
+        // the Largest Event ID to the smallest.
+        long maxEventId = -1L;
+        final List<File> fileList = Arrays.asList(files);
+        Collections.sort(fileList, DirectoryUtils.LARGEST_ID_FIRST);
+        for (final File file : fileList) {
+            try {
+                final RecordReader reader = 
recordReaderFactory.newRecordReader(file, Collections.emptyList(), 
Integer.MAX_VALUE);
+                final long eventId = reader.getMaxEventId();
+                if (eventId > maxEventId) {
+                    maxEventId = eventId;
+                    break;
+                }
+            } catch (final IOException ioe) {
+                logger.warn("Could not read file {}; if this file contains 
Provenance Events, new events may be created with the same event identifiers", 
file, ioe);
+            }
+        }
+
+        synchronized (minEventIdToPathMap) {
+            for (final File file : fileList) {
+                final long minEventId = DirectoryUtils.getMinId(file);
+                minEventIdToPathMap.put(minEventId, file);
+            }
+        }
+
+        this.maxEventId.set(maxEventId);
+
+        // If configured to compress, compress any files that are not yet 
compressed.
+        if (config.isCompressOnRollover()) {
+            final File[] uncompressedFiles = partitionDirectory.listFiles(f -> 
f.getName().endsWith(".prov"));
+            if (uncompressedFiles != null) {
+                for (final File file : uncompressedFiles) {
+                    // If we have both a compressed file and an uncompressed 
file for the same .prov file, then
+                    // we must have been in the process of compressing it when 
NiFi was restarted. Delete the partial
+                    // .gz file and we will start compressing it again.
+                    final File compressed = new File(file.getParentFile(), 
file.getName() + ".gz");
+                    if (compressed.exists()) {
+                        compressed.delete();
+                    }
+                }
+            }
+        }
+
+        // Update the ID Generator to the max of the ID Generator or maxEventId
+        final long nextPartitionId = maxEventId + 1;
+        final long updatedId = idGenerator.updateAndGet(curVal -> 
Math.max(curVal, nextPartitionId));
+        logger.info("After recovering {}, next Event ID to be generated will 
be {}", partitionDirectory, updatedId);
+    }
+
+
+    @Override
+    public StorageResult addEvents(final Iterable<ProvenanceEventRecord> 
events) throws IOException {
+        if (closed) {
+            throw new IOException(this + " is closed");
+        }
+
+        // Claim a Record Writer Lease so that we have a writer to persist the 
events to
+        boolean claimed = false;
+        RecordWriterLease lease = null;
+        while (!claimed) {
+            lease = getLease();
+            claimed = lease.tryClaim();
+
+            if (claimed) {
+                break;
+            }
+
+            if (lease.shouldRoll()) {
+                tryRollover(lease);
+            }
+        }
+
+        // Add the events to the writer and ensure that we always
+        // relinquish the claim that we've obtained on the writer
+        Map<ProvenanceEventRecord, StorageSummary> storageMap;
+        final RecordWriter writer = lease.getWriter();
+        try {
+            storageMap = addEvents(events, writer);
+        } finally {
+            lease.relinquishClaim();
+        }
+
+        // Roll over the writer if necessary
+        Integer eventsRolledOver = null;
+        final boolean shouldRoll = lease.shouldRoll();
+        try {
+            if (shouldRoll && tryRollover(lease)) {
+                eventsRolledOver = writer.getRecordsWritten();
+            }
+        } catch (final IOException ioe) {
+            logger.error("Updated {} but failed to rollover to a new Event 
File", this, ioe);
+        }
+
+        final Integer rolloverCount = eventsRolledOver;
+        return new StorageResult() {
+            @Override
+            public Map<ProvenanceEventRecord, StorageSummary> 
getStorageLocations() {
+                return storageMap;
+            }
+
+            @Override
+            public boolean triggeredRollover() {
+                return rolloverCount != null;
+            }
+
+            @Override
+            public Integer getEventsRolledOver() {
+                return rolloverCount;
+            }
+
+            @Override
+            public String toString() {
+                return getStorageLocations().toString();
+            }
+        };
+    }
+
+    private RecordWriterLease getLease() throws IOException {
+        while (true) {
+            final RecordWriterLease lease = eventWriterLeaseRef.get();
+            if (lease != null) {
+                return lease;
+            }
+
+            if (tryRollover(null)) {
+                return eventWriterLeaseRef.get();
+            }
+        }
+    }
+
+    private synchronized boolean tryRollover(final RecordWriterLease lease) 
throws IOException {
+        if (!Objects.equals(lease, eventWriterLeaseRef.get())) {
+            return false;
+        }
+
+        final long nextEventId = idGenerator.get();
+        final File updatedEventFile = new File(partitionDirectory, nextEventId 
+ ".prov");
+        final RecordWriter updatedWriter = 
recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true);
+        final RecordWriterLease updatedLease = new 
RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), 
config.getMaxEventFileCount());
+        final boolean updated = eventWriterLeaseRef.compareAndSet(lease, 
updatedLease);
+
+        if (updated) {
+            updatedWriter.writeHeader(nextEventId);
+
+            synchronized (minEventIdToPathMap) {
+                minEventIdToPathMap.put(nextEventId, updatedEventFile);
+            }
+
+            if (config.isCompressOnRollover() && lease != null && 
lease.getWriter() != null) {
+                boolean offered = false;
+                while (!offered && !closed) {
+                    try {
+                        offered = 
filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS);
+                    } catch (final InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new IOException("Interrupted while waiting to 
enqueue " + lease.getWriter().getFile() + " for compression");
+                    }
+                }
+            }
+
+            return true;
+        } else {
+            try {
+                updatedWriter.close();
+            } catch (final Exception e) {
+                logger.warn("Failed to close Record Writer {}; some resources 
may not be cleaned up properly.", updatedWriter, e);
+            }
+
+            updatedEventFile.delete();
+            return false;
+        }
+    }
+
+    private Map<ProvenanceEventRecord, StorageSummary> addEvents(final 
Iterable<ProvenanceEventRecord> events, final RecordWriter writer) throws 
IOException {
+        final Map<ProvenanceEventRecord, StorageSummary> locationMap = new 
HashMap<>();
+
+        try {
+            long maxId = -1L;
+            int numEvents = 0;
+            for (final ProvenanceEventRecord nextEvent : events) {
+                final StorageSummary writerSummary = 
writer.writeRecord(nextEvent);
+                final StorageSummary summaryWithIndex = new 
StorageSummary(writerSummary.getEventId(), writerSummary.getStorageLocation(), 
this.partitionName,
+                    writerSummary.getBlockIndex(), 
writerSummary.getSerializedLength(), writerSummary.getBytesWritten());
+                locationMap.put(nextEvent, summaryWithIndex);
+                maxId = summaryWithIndex.getEventId();
+                numEvents++;
+            }
+
+            if (numEvents == 0) {
+                return locationMap;
+            }
+
+            writer.flush();
+
+            // Update max event id to be equal to be the greater of the 
current value or the
+            // max value just written.
+            final long maxIdWritten = maxId;
+            this.maxEventId.getAndUpdate(cur -> maxIdWritten > cur ? 
maxIdWritten : cur);
+
+            if (config.isAlwaysSync()) {
+                writer.sync();
+            }
+        } catch (final Exception e) {
+            // We need to set the repoDirty flag before we release the lock 
for this journal.
+            // Otherwise, another thread may write to this journal -- this is 
a problem because
+            // the journal contains part of our record but not all of it. 
Writing to the end of this
+            // journal will result in corruption!
+            writer.markDirty();
+            throw e;
+        }
+
+        return locationMap;
+    }
+
+
+    @Override
+    public long getSize() {
+        return getEventFilesFromDisk()
+            .collect(Collectors.summarizingLong(file -> file.length()))
+            .getSum();
+    }
+
+    private Stream<File> getEventFilesFromDisk() {
+        final File[] files = 
partitionDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER);
+        return files == null ? Stream.empty() : Arrays.stream(files);
+    }
+
+    @Override
+    public long getMaxEventId() {
+        return maxEventId.get();
+    }
+
+    @Override
+    public Optional<ProvenanceEventRecord> getEvent(final long id) throws 
IOException {
+        final Optional<File> option = getPathForEventId(id);
+        if (!option.isPresent()) {
+            return Optional.empty();
+        }
+
+        try (final RecordReader reader = 
recordReaderFactory.newRecordReader(option.get(), Collections.emptyList(), 
config.getMaxAttributeChars())) {
+            final Optional<ProvenanceEventRecord> eventOption = 
reader.skipToEvent(id);
+            if (!eventOption.isPresent()) {
+                return eventOption;
+            }
+
+            // If an event is returned, the event may be the one we want, or 
it may be an event with a
+            // higher event ID, if the desired event is not in the record 
reader. So we need to get the
+            // event and check the Event ID to know whether to return the 
empty optional or the Optional
+            // that was returned.
+            final ProvenanceEventRecord event = eventOption.get();
+            if (event.getEventId() == id) {
+                return eventOption;
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, 
final int maxEvents, final EventAuthorizer authorizer) throws IOException {
+        final List<ProvenanceEventRecord> events = new 
ArrayList<>(Math.min(maxEvents, 1000));
+        try (final EventIterator iterator = 
createEventIterator(firstRecordId)) {
+            Optional<ProvenanceEventRecord> eventOption;
+            while ((eventOption = iterator.nextEvent()).isPresent() && 
events.size() < maxEvents) {
+                final ProvenanceEventRecord event = eventOption.get();
+                if (authorizer.isAuthorized(event)) {
+                    events.add(event);
+                }
+            }
+        }
+
+        return events;
+    }
+
+    @Override
+    public EventIterator createEventIterator(final long minDesiredId) {
+        final List<File> filesOfInterest = new ArrayList<>();
+        synchronized (minEventIdToPathMap) {
+            File lastFile = null;
+
+            for (final Map.Entry<Long, File> entry : 
minEventIdToPathMap.entrySet()) {
+                final long minFileId = entry.getKey();
+
+                // If the minimum ID for the file is greater than the 
minDesiredId, then
+                // that means that we will want to iterate over this file.
+                if (minFileId > minDesiredId) {
+                    // The minimum ID for this file is greater than the 
desired ID, so
+                    // that means that the last file we saw may have the 
minimum desired
+                    // ID and any number of more events before we get to this 
file. So
+                    // if we've not already added the lastFile, add it now.
+                    if (filesOfInterest.isEmpty() && lastFile != null) {
+                        filesOfInterest.add(lastFile);
+                    }
+
+                    filesOfInterest.add(entry.getValue());
+                }
+
+                lastFile = entry.getValue();
+            }
+
+            // We don't know the max ID of the last file, so we always want to 
include it, since it may contain
+            // an event with an ID greater than minDesiredId.
+            if (lastFile != null && !filesOfInterest.contains(lastFile)) {
+                filesOfInterest.add(lastFile);
+            }
+        }
+
+        if (filesOfInterest.isEmpty()) {
+            return EventIterator.EMPTY;
+        }
+
+        return new SequentialRecordReaderEventIterator(filesOfInterest, 
recordReaderFactory, minDesiredId, config.getMaxAttributeChars());
+    }
+
+
+    @Override
+    public EventIterator createEventIterator(final List<Long> eventIds) {
+        final List<File> allFiles;
+        synchronized (minEventIdToPathMap) {
+            allFiles = new ArrayList<>(minEventIdToPathMap.values());
+        }
+
+        if (allFiles.isEmpty()) {
+            return EventIterator.EMPTY;
+        }
+
+        return new SelectiveRecordReaderEventIterator(allFiles, 
recordReaderFactory, eventIds, config.getMaxAttributeChars());
+    }
+
+    private Optional<File> getPathForEventId(final long id) {
+        File lastFile = null;
+
+        synchronized (minEventIdToPathMap) {
+            for (final Map.Entry<Long, File> entry : 
minEventIdToPathMap.entrySet()) {
+                final long minId = entry.getKey();
+                if (minId > id) {
+                    break;
+                }
+
+                lastFile = entry.getValue();
+            }
+        }
+
+        return Optional.ofNullable(lastFile);
+    }
+
+
+    @Override
+    public void purgeOldEvents(final long olderThan, final TimeUnit unit) {
+        final long timeCutoff = System.currentTimeMillis() - 
unit.toMillis(olderThan);
+
+        getEventFilesFromDisk().filter(file -> file.lastModified() < 
timeCutoff)
+            .sorted(DirectoryUtils.SMALLEST_ID_FIRST)
+            .forEach(file -> delete(file));
+    }
+
+
+    @Override
+    public long purgeOldestEvents() {
+        final List<File> eventFiles = 
getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
+        if (eventFiles.isEmpty()) {
+            return 0L;
+        }
+
+        for (final File eventFile : eventFiles) {
+            final long fileSize = eventFile.length();
+
+            if (delete(eventFile)) {
+                logger.debug("{} Deleted {} event file ({}) due to storage 
limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
+                return fileSize;
+            } else {
+                logger.warn("{} Failed to delete oldest event file {}. This 
file should be cleaned up manually.", this, eventFile);
+                continue;
+            }
+        }
+
+        return 0L;
+    }
+
+    private boolean delete(final File file) {
+        final long firstEventId = DirectoryUtils.getMinId(file);
+        synchronized (minEventIdToPathMap) {
+            minEventIdToPathMap.remove(firstEventId);
+        }
+
+        if (!file.delete()) {
+            logger.warn("Failed to remove Provenance Event file {}; this file 
should be cleaned up manually", file);
+            return false;
+        }
+
+        final File tocFile = TocUtil.getTocFile(file);
+        if (tocFile.exists() && !tocFile.delete()) {
+            logger.warn("Failed to remove Provenance Table-of-Contents file 
{}; this file should be cleaned up manually", tocFile);
+        }
+
+        return true;
+    }
+
+    void reindexLatestEvents(final EventIndex eventIndex) {
+        final List<File> eventFiles = 
getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
+        if (eventFiles.isEmpty()) {
+            return;
+        }
+
+        final long minEventIdToReindex = 
eventIndex.getMinimumEventIdToReindex(partitionName);
+        final long maxEventId = getMaxEventId();
+        final long eventsToReindex = maxEventId - minEventIdToReindex;
+
+        logger.info("The last Provenance Event indexed for partition {} is {}, 
but the last event written to partition has ID {}. "
+            + "Re-indexing up to the last {} events to ensure that the Event 
Index is accurate and up-to-date",
+            partitionName, minEventIdToReindex, maxEventId, eventsToReindex, 
partitionDirectory);
+
+        // Find the first event file that we care about.
+        int firstEventFileIndex = 0;
+        for (int i = eventFiles.size() - 1; i >= 0; i--) {
+            final File eventFile = eventFiles.get(i);
+            final long minIdInFile = DirectoryUtils.getMinId(eventFile);
+            if (minIdInFile <= minEventIdToReindex) {
+                firstEventFileIndex = i;
+                break;
+            }
+        }
+
+        // Create a subList that contains the files of interest
+        final List<File> eventFilesToReindex = 
eventFiles.subList(firstEventFileIndex, eventFiles.size());
+
+        final ExecutorService executor = 
Executors.newFixedThreadPool(Math.min(4, eventFilesToReindex.size()), new 
NamedThreadFactory("Re-Index Provenance Events", true));
+        final List<Future<?>> futures = new 
ArrayList<>(eventFilesToReindex.size());
+        final AtomicLong reindexedCount = new AtomicLong(0L);
+
+        // Re-Index the last bunch of events.
+        // We don't use an Event Iterator here because it's possible that one 
of the event files could be corrupt (for example, if NiFi does while
+        // writing to the file, a record may be incomplete). We don't want to 
prevent us from moving on and continuing to index the rest of the
+        // un-indexed events. So we just use a List of files and create a 
reader for each one.
+        final long start = System.nanoTime();
+        int fileCount = 0;
+        for (final File eventFile : eventFilesToReindex) {
+            final boolean skipToEvent;
+            if (fileCount++ == 0) {
+                skipToEvent = true;
+            } else {
+                skipToEvent = false;
+            }
+
+            final Runnable reindexTask = new Runnable() {
+                @Override
+                public void run() {
+                    final Map<ProvenanceEventRecord, StorageSummary> 
storageMap = new HashMap<>(1000);
+
+                    try (final RecordReader recordReader = 
recordReaderFactory.newRecordReader(eventFile, Collections.emptyList(), 
Integer.MAX_VALUE)) {
+                        if (skipToEvent) {
+                            final Optional<ProvenanceEventRecord> eventOption 
= recordReader.skipToEvent(minEventIdToReindex);
+                            if (!eventOption.isPresent()) {
+                                return;
+                            }
+                        }
+
+                        StandardProvenanceEventRecord event = null;
+                        while (true) {
+                            final long startBytesConsumed = 
recordReader.getBytesConsumed();
+
+                            event = recordReader.nextRecord();
+                            if (event == null) {
+                                eventIndex.reindexEvents(storageMap);
+                                reindexedCount.addAndGet(storageMap.size());
+                                storageMap.clear();
+                                break; // stop reading from this file
+                            } else {
+                                final long eventSize = 
recordReader.getBytesConsumed() - startBytesConsumed;
+                                storageMap.put(event, new 
StorageSummary(event.getEventId(), eventFile.getName(), partitionName, 
recordReader.getBlockIndex(), eventSize, 0L));
+
+                                if (storageMap.size() == 1000) {
+                                    eventIndex.reindexEvents(storageMap);
+                                    
reindexedCount.addAndGet(storageMap.size());
+                                    storageMap.clear();
+                                }
+                            }
+                        }
+                    } catch (final EOFException eof) {
+                        // Ran out of data. Continue on.
+                        logger.warn("Failed to find event with ID {} in Event 
File {} due to {}", minEventIdToReindex, eventFile, eof.toString());
+                    } catch (final Exception e) {
+                        logger.error("Failed to index Provenance Events found 
in {}", eventFile, e);
+                    }
+                }
+            };
+
+            futures.add(executor.submit(reindexTask));
+        }
+
+        for (final Future<?> future : futures) {
+            try {
+                future.get();
+            } catch (final ExecutionException ee) {
+                logger.error("Failed to re-index some Provenance events. These 
events may not be query-able via the Provenance interface", ee.getCause());
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                logger.error("Interrupted while waiting for Provenance events 
to be re-indexed", e);
+                break;
+            }
+        }
+
+        try {
+            eventIndex.commitChanges(partitionName);
+        } catch (final IOException e) {
+            logger.error("Failed to re-index Provenance Events for partition " 
+ partitionName, e);
+        }
+
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+        final long seconds = millis / 1000L;
+        final long millisRemainder = millis % 1000L;
+        logger.info("Finished re-indexing {} events across {} files for {} in 
{}.{} seconds",
+            reindexedCount.get(), eventFilesToReindex.size(), 
partitionDirectory, seconds, millisRemainder);
+    }
+
+    @Override
+    public String toString() {
+        return "Provenance Event Store Partition[directory=" + 
partitionDirectory + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/AuthorizingEventIterator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/AuthorizingEventIterator.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/AuthorizingEventIterator.java
new file mode 100644
index 0000000..7ff2be7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/AuthorizingEventIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store.iterator;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.authorization.EventTransformer;
+
+public class AuthorizingEventIterator implements EventIterator {
+    private final EventIterator iterator;
+    private final EventAuthorizer authorizer;
+    private final EventTransformer transformer;
+
+    public AuthorizingEventIterator(final EventIterator iterator, final 
EventAuthorizer authorizer,
+        final EventTransformer unauthorizedTransformer) {
+        this.iterator = iterator;
+        this.authorizer = authorizer;
+        this.transformer = unauthorizedTransformer;
+    }
+
+    @Override
+    public void close() throws IOException {
+        iterator.close();
+    }
+
+    @Override
+    public Optional<ProvenanceEventRecord> nextEvent() throws IOException {
+        while (true) {
+            final Optional<ProvenanceEventRecord> next = iterator.nextEvent();
+            if (!next.isPresent()) {
+                return next;
+            }
+
+            if (authorizer.isAuthorized(next.get())) {
+                return next;
+            }
+
+            final Optional<ProvenanceEventRecord> eventOption = 
transformer.transform(next.get());
+            if (eventOption.isPresent()) {
+                return eventOption;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/EventIterator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/EventIterator.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/EventIterator.java
new file mode 100644
index 0000000..79acda3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/EventIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store.iterator;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface EventIterator extends Closeable {
+
+    Optional<ProvenanceEventRecord> nextEvent() throws IOException;
+
+    public static EventIterator EMPTY = new EventIterator() {
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public Optional<ProvenanceEventRecord> nextEvent() {
+            return Optional.empty();
+        }
+    };
+
+    public static EventIterator of(final ProvenanceEventRecord... events) {
+        final Iterator<ProvenanceEventRecord> itr = 
Arrays.asList(events).iterator();
+        return new EventIterator() {
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public Optional<ProvenanceEventRecord> nextEvent() {
+                return itr.hasNext() ? Optional.empty() : 
Optional.of(itr.next());
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
new file mode 100644
index 0000000..c4a130b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/iterator/SelectiveRecordReaderEventIterator.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store.iterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.store.RecordReaderFactory;
+import org.apache.nifi.provenance.util.DirectoryUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SelectiveRecordReaderEventIterator implements EventIterator {
+    private static final Logger logger = 
LoggerFactory.getLogger(SelectiveRecordReaderEventIterator.class);
+    private final List<File> files;
+    private final RecordReaderFactory readerFactory;
+    private final List<Long> eventIds;
+    private final Iterator<Long> idIterator;
+    private final int maxAttributeChars;
+
+    private boolean closed = false;
+    private RecordReader reader;
+    private File currentFile;
+
+    public SelectiveRecordReaderEventIterator(final List<File> filesToRead, 
final RecordReaderFactory readerFactory, final List<Long> eventIds, final int 
maxAttributeChars) {
+        this.readerFactory = readerFactory;
+
+        this.eventIds = new ArrayList<>(eventIds);
+        Collections.sort(this.eventIds);
+        idIterator = this.eventIds.iterator();
+
+        // Make a copy of the list of files and prune out any Files that are 
not relevant to the Event ID's that we were given.
+        if (eventIds.isEmpty() || filesToRead.isEmpty()) {
+            this.files = Collections.emptyList();
+        } else {
+            this.files = filterUnneededFiles(filesToRead, this.eventIds);
+        }
+
+        this.maxAttributeChars = maxAttributeChars;
+    }
+
+    protected static List<File> filterUnneededFiles(final List<File> 
filesToRead, final List<Long> eventIds) {
+        final List<File> files = new ArrayList<>();
+        final Long firstEventId = eventIds.get(0);
+        final Long lastEventId = eventIds.get(eventIds.size() - 1);
+
+        final List<File> sortedFileList = new ArrayList<>(filesToRead);
+        Collections.sort(sortedFileList, DirectoryUtils.SMALLEST_ID_FIRST);
+
+        File lastFile = null;
+        for (final File file : filesToRead) {
+            final long firstIdInFile = DirectoryUtils.getMinId(file);
+            if (firstIdInFile > lastEventId) {
+                continue;
+            }
+
+            if (firstIdInFile > firstEventId) {
+                if (files.isEmpty() && lastFile != null) {
+                    files.add(lastFile);
+                }
+
+                files.add(file);
+            }
+
+            lastFile = file;
+        }
+
+        if (files.isEmpty() && lastFile != null) {
+            files.add(lastFile);
+        }
+
+        return files;
+    }
+
+    @Override
+    public void close() throws IOException {
+        closed = true;
+
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+
+    @Override
+    public Optional<ProvenanceEventRecord> nextEvent() throws IOException {
+        if (closed) {
+            throw new IOException("EventIterator is already closed");
+        }
+
+        final long start = System.nanoTime();
+        try {
+            while (idIterator.hasNext()) {
+                // Determine the next event ID to fetch
+                final long eventId = idIterator.next();
+
+                // Determine which file the event should be in.
+                final File fileForEvent = getFileForEventId(eventId);
+                if (fileForEvent == null) {
+                    continue;
+                }
+
+                // If we determined which file the event should be in, and 
that's not the file that
+                // we are currently reading from, rotate the reader to the 
appropriate one.
+                if (!fileForEvent.equals(currentFile)) {
+                    if (reader != null) {
+                        try {
+                            reader.close();
+                        } catch (final Exception e) {
+                            logger.warn("Failed to close {}; some resources 
may not be cleaned up appropriately", reader);
+                        }
+                    }
+
+                    reader = readerFactory.newRecordReader(fileForEvent, 
Collections.emptyList(), maxAttributeChars);
+                    this.currentFile = fileForEvent;
+                }
+
+                final Optional<ProvenanceEventRecord> eventOption = 
reader.skipToEvent(eventId);
+                if (eventOption.isPresent() && eventOption.get().getEventId() 
== eventId) {
+                    reader.nextRecord();    // consume the event from the 
stream.
+                    return eventOption;
+                }
+
+                continue;
+            }
+
+            return Optional.empty();
+        } finally {
+            final long ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+            logger.trace("Took {} ms to read next event", ms);
+        }
+    }
+
+    private File getFileForEventId(final long eventId) {
+        File lastFile = null;
+        for (final File file : files) {
+            final long firstEventId = DirectoryUtils.getMinId(file);
+            if (firstEventId == eventId) {
+                return file;
+            }
+
+            if (firstEventId > eventId) {
+                return lastFile;
+            }
+
+            lastFile = file;
+        }
+
+        return lastFile;
+    }
+}

Reply via email to