Repository: nifi
Updated Branches:
  refs/heads/master 8d467f3d1 -> 96ed405d7


http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestLuceneEventIndex.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestLuceneEventIndex.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestLuceneEventIndex.java
new file mode 100644
index 0000000..c892376
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestLuceneEventIndex.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
+import org.apache.nifi.provenance.lucene.IndexManager;
+import org.apache.nifi.provenance.lucene.SimpleIndexManager;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QueryResult;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchTerms;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.provenance.store.ArrayListEventStore;
+import org.apache.nifi.provenance.store.EventStore;
+import org.apache.nifi.provenance.store.StorageResult;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestLuceneEventIndex {
+
+    private final AtomicLong idGenerator = new AtomicLong(0L);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void setLogger() {
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", 
"DEBUG");
+    }
+
+
+    @Test(timeout = 5000)
+    public void testGetMinimumIdToReindex() throws InterruptedException {
+        final RepositoryConfiguration repoConfig = createConfig(1);
+        repoConfig.setDesiredIndexSize(1L);
+        final IndexManager indexManager = new SimpleIndexManager(repoConfig);
+
+        final ArrayListEventStore eventStore = new ArrayListEventStore();
+        final LuceneEventIndex index = new LuceneEventIndex(repoConfig, 
indexManager, 20_000, EventReporter.NO_OP);
+        index.initialize(eventStore);
+
+        for (int i = 0; i < 50_000; i++) {
+            final ProvenanceEventRecord event = createEvent("1234");
+            final StorageResult storageResult = eventStore.addEvent(event);
+            index.addEvents(storageResult.getStorageLocations());
+        }
+
+        while (index.getMaxEventId("1") < 40_000L) {
+            Thread.sleep(25);
+        }
+
+        final long id = index.getMinimumEventIdToReindex("1");
+        assertTrue(id >= 30000L);
+    }
+
+    @Test(timeout = 5000)
+    public void testUnauthorizedEventsGetPlaceholdersForLineage() throws 
InterruptedException {
+        final RepositoryConfiguration repoConfig = createConfig(1);
+        repoConfig.setDesiredIndexSize(1L);
+        final IndexManager indexManager = new SimpleIndexManager(repoConfig);
+
+        final ArrayListEventStore eventStore = new ArrayListEventStore();
+        final LuceneEventIndex index = new LuceneEventIndex(repoConfig, 
indexManager, 3, EventReporter.NO_OP);
+        index.initialize(eventStore);
+
+        for (int i = 0; i < 3; i++) {
+            final ProvenanceEventRecord event = createEvent("1234");
+            final StorageResult storageResult = eventStore.addEvent(event);
+            index.addEvents(storageResult.getStorageLocations());
+        }
+
+        final NiFiUser user = createUser();
+
+        List<LineageNode> nodes = Collections.emptyList();
+        while (nodes.size() < 3) {
+            final ComputeLineageSubmission submission = 
index.submitLineageComputation(1L, user, EventAuthorizer.DENY_ALL);
+            assertTrue(submission.getResult().awaitCompletion(5, 
TimeUnit.SECONDS));
+
+            nodes = submission.getResult().getNodes();
+            Thread.sleep(25L);
+        }
+
+        assertEquals(3, nodes.size());
+
+        for (final LineageNode node : nodes) {
+            assertEquals(LineageNodeType.PROVENANCE_EVENT_NODE, 
node.getNodeType());
+            final ProvenanceEventLineageNode eventNode = 
(ProvenanceEventLineageNode) node;
+            assertEquals(ProvenanceEventType.UNKNOWN, 
eventNode.getEventType());
+        }
+    }
+
+    @Test(timeout = 5000)
+    public void testUnauthorizedEventsGetPlaceholdersForExpandChildren() 
throws InterruptedException {
+        final RepositoryConfiguration repoConfig = createConfig(1);
+        repoConfig.setDesiredIndexSize(1L);
+        final IndexManager indexManager = new SimpleIndexManager(repoConfig);
+
+        final ArrayListEventStore eventStore = new ArrayListEventStore();
+        final LuceneEventIndex index = new LuceneEventIndex(repoConfig, 
indexManager, 3, EventReporter.NO_OP);
+        index.initialize(eventStore);
+
+        final ProvenanceEventRecord firstEvent = createEvent("4444");
+
+        final Map<String, String> previousAttributes = new HashMap<>();
+        previousAttributes.put("uuid", "4444");
+        final Map<String, String> updatedAttributes = new HashMap<>();
+        updatedAttributes.put("updated", "true");
+        final ProvenanceEventRecord fork = new 
StandardProvenanceEventRecord.Builder()
+            .setEventType(ProvenanceEventType.FORK)
+            .setAttributes(previousAttributes, updatedAttributes)
+            .addChildFlowFile("1234")
+            .setComponentId("component-1")
+            .setComponentType("unit test")
+            .setEventId(idGenerator.getAndIncrement())
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(System.currentTimeMillis())
+            .setFlowFileUUID("4444")
+            .setLineageStartDate(System.currentTimeMillis())
+            .setCurrentContentClaim("container", "section", "unit-test-id", 
0L, 1024L)
+            .build();
+
+        index.addEvents(eventStore.addEvent(firstEvent).getStorageLocations());
+        index.addEvents(eventStore.addEvent(fork).getStorageLocations());
+
+        for (int i = 0; i < 3; i++) {
+            final ProvenanceEventRecord event = createEvent("1234");
+            final StorageResult storageResult = eventStore.addEvent(event);
+            index.addEvents(storageResult.getStorageLocations());
+        }
+
+        final NiFiUser user = createUser();
+
+        final EventAuthorizer allowForkEvents = new EventAuthorizer() {
+            @Override
+            public boolean isAuthorized(ProvenanceEventRecord event) {
+                return event.getEventType() == ProvenanceEventType.FORK;
+            }
+
+            @Override
+            public void authorize(ProvenanceEventRecord event) throws 
AccessDeniedException {
+            }
+        };
+
+        List<LineageNode> nodes = Collections.emptyList();
+        while (nodes.size() < 5) {
+            final ComputeLineageSubmission submission = 
index.submitExpandChildren(1L, user, allowForkEvents);
+            assertTrue(submission.getResult().awaitCompletion(5, 
TimeUnit.SECONDS));
+
+            nodes = submission.getResult().getNodes();
+            Thread.sleep(25L);
+        }
+
+        assertEquals(5, nodes.size());
+
+        assertEquals(1L, nodes.stream().filter(n -> n.getNodeType() == 
LineageNodeType.FLOWFILE_NODE).count());
+        assertEquals(4L, nodes.stream().filter(n -> n.getNodeType() == 
LineageNodeType.PROVENANCE_EVENT_NODE).count());
+
+        final Map<ProvenanceEventType, List<LineageNode>> eventMap = 
nodes.stream()
+            .filter(n -> n.getNodeType() == 
LineageNodeType.PROVENANCE_EVENT_NODE)
+            .collect(Collectors.groupingBy(n -> ((ProvenanceEventLineageNode) 
n).getEventType()));
+
+        assertEquals(2, eventMap.size());
+        assertEquals(1, eventMap.get(ProvenanceEventType.FORK).size());
+        assertEquals(3, eventMap.get(ProvenanceEventType.UNKNOWN).size());
+    }
+
+    @Test(timeout = 5000)
+    public void testUnauthorizedEventsGetPlaceholdersForFindParents() throws 
InterruptedException {
+        final RepositoryConfiguration repoConfig = createConfig(1);
+        repoConfig.setDesiredIndexSize(1L);
+        final IndexManager indexManager = new SimpleIndexManager(repoConfig);
+
+        final ArrayListEventStore eventStore = new ArrayListEventStore();
+        final LuceneEventIndex index = new LuceneEventIndex(repoConfig, 
indexManager, 3, EventReporter.NO_OP);
+        index.initialize(eventStore);
+
+        final ProvenanceEventRecord firstEvent = createEvent("4444");
+
+        final Map<String, String> previousAttributes = new HashMap<>();
+        previousAttributes.put("uuid", "4444");
+        final Map<String, String> updatedAttributes = new HashMap<>();
+        updatedAttributes.put("updated", "true");
+        final ProvenanceEventRecord join = new 
StandardProvenanceEventRecord.Builder()
+            .setEventType(ProvenanceEventType.JOIN)
+            .setAttributes(previousAttributes, updatedAttributes)
+            .addParentUuid("4444")
+            .addChildFlowFile("1234")
+            .setComponentId("component-1")
+            .setComponentType("unit test")
+            .setEventId(idGenerator.getAndIncrement())
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(System.currentTimeMillis())
+            .setFlowFileUUID("1234")
+            .setLineageStartDate(System.currentTimeMillis())
+            .setCurrentContentClaim("container", "section", "unit-test-id", 
0L, 1024L)
+            .build();
+
+        index.addEvents(eventStore.addEvent(firstEvent).getStorageLocations());
+        index.addEvents(eventStore.addEvent(join).getStorageLocations());
+
+        for (int i = 0; i < 3; i++) {
+            final ProvenanceEventRecord event = createEvent("1234");
+            final StorageResult storageResult = eventStore.addEvent(event);
+            index.addEvents(storageResult.getStorageLocations());
+        }
+
+        final NiFiUser user = createUser();
+
+        final EventAuthorizer allowJoinEvents = new EventAuthorizer() {
+            @Override
+            public boolean isAuthorized(ProvenanceEventRecord event) {
+                return event.getEventType() == ProvenanceEventType.JOIN;
+            }
+
+            @Override
+            public void authorize(ProvenanceEventRecord event) throws 
AccessDeniedException {
+            }
+        };
+
+        List<LineageNode> nodes = Collections.emptyList();
+        while (nodes.size() < 2) {
+            final ComputeLineageSubmission submission = 
index.submitExpandParents(1L, user, allowJoinEvents);
+            assertTrue(submission.getResult().awaitCompletion(5, 
TimeUnit.SECONDS));
+
+            nodes = submission.getResult().getNodes();
+            Thread.sleep(25L);
+        }
+
+        assertEquals(2, nodes.size());
+
+        final Map<ProvenanceEventType, List<LineageNode>> eventMap = 
nodes.stream()
+            .filter(n -> n.getNodeType() == 
LineageNodeType.PROVENANCE_EVENT_NODE)
+            .collect(Collectors.groupingBy(n -> ((ProvenanceEventLineageNode) 
n).getEventType()));
+
+        assertEquals(2, eventMap.size());
+        assertEquals(1, eventMap.get(ProvenanceEventType.JOIN).size());
+        assertEquals(1, eventMap.get(ProvenanceEventType.UNKNOWN).size());
+
+        assertEquals("4444", 
eventMap.get(ProvenanceEventType.UNKNOWN).get(0).getFlowFileUuid());
+    }
+
+    @Test(timeout = 5000)
+    public void testUnauthorizedEventsGetFilteredForQuery() throws 
InterruptedException {
+        final RepositoryConfiguration repoConfig = createConfig(1);
+        repoConfig.setDesiredIndexSize(1L);
+        final IndexManager indexManager = new SimpleIndexManager(repoConfig);
+
+        final ArrayListEventStore eventStore = new ArrayListEventStore();
+        final LuceneEventIndex index = new LuceneEventIndex(repoConfig, 
indexManager, 3, EventReporter.NO_OP);
+        index.initialize(eventStore);
+
+        for (int i = 0; i < 3; i++) {
+            final ProvenanceEventRecord event = createEvent("1234");
+            final StorageResult storageResult = eventStore.addEvent(event);
+            index.addEvents(storageResult.getStorageLocations());
+        }
+
+        final Query query = new Query(UUID.randomUUID().toString());
+        final EventAuthorizer authorizer = new EventAuthorizer() {
+            @Override
+            public boolean isAuthorized(ProvenanceEventRecord event) {
+                return event.getEventId() % 2 == 0;
+            }
+
+            @Override
+            public void authorize(ProvenanceEventRecord event) throws 
AccessDeniedException {
+                throw new AccessDeniedException();
+            }
+        };
+
+        List<ProvenanceEventRecord> events = Collections.emptyList();
+        while (events.size() < 2) {
+            final QuerySubmission submission = index.submitQuery(query, 
authorizer, "unit test");
+            assertTrue(submission.getResult().awaitCompletion(5, 
TimeUnit.SECONDS));
+            events = submission.getResult().getMatchingEvents();
+            Thread.sleep(25L);
+        }
+
+        assertEquals(2, events.size());
+    }
+
+
+    private NiFiUser createUser() {
+        return new NiFiUser() {
+            @Override
+            public String getIdentity() {
+                return "unit test";
+            }
+
+            @Override
+            public NiFiUser getChain() {
+                return null;
+            }
+
+            @Override
+            public boolean isAnonymous() {
+                return false;
+            }
+
+            @Override
+            public String getClientAddress() {
+                return "127.0.0.1";
+            }
+        };
+    }
+
+
+    @Test(timeout = 5000)
+    public void testExpiration() throws InterruptedException, IOException {
+        final RepositoryConfiguration repoConfig = createConfig(1);
+        repoConfig.setDesiredIndexSize(1L);
+        final IndexManager indexManager = new SimpleIndexManager(repoConfig);
+
+        final LuceneEventIndex index = new LuceneEventIndex(repoConfig, 
indexManager, 1, EventReporter.NO_OP);
+
+        final List<ProvenanceEventRecord> events = new ArrayList<>();
+        events.add(createEvent(500000L));
+        events.add(createEvent());
+
+        final EventStore eventStore = Mockito.mock(EventStore.class);
+        Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
+            @Override
+            public List<ProvenanceEventRecord> answer(final InvocationOnMock 
invocation) throws Throwable {
+                final Long eventId = invocation.getArgumentAt(0, Long.class);
+                assertEquals(0, eventId.longValue());
+                assertEquals(1, invocation.getArgumentAt(1, 
Integer.class).intValue());
+                return Collections.singletonList(events.get(0));
+            }
+        }).when(eventStore).getEvents(Mockito.anyLong(), Mockito.anyInt());
+
+        index.initialize(eventStore);
+        index.addEvent(events.get(0), 
createStorageSummary(events.get(0).getEventId()));
+
+        // Add the first event to the index and wait for it to be indexed, 
since indexing is asynchronous.
+        List<File> allDirectories = Collections.emptyList();
+        while (allDirectories.isEmpty()) {
+            allDirectories = index.getDirectoryManager().getDirectories(null, 
null);
+        }
+
+        events.remove(0); // Remove the first event from the store
+        index.performMaintenance();
+        assertEquals(1, index.getDirectoryManager().getDirectories(null, 
null).size());
+    }
+
+    private StorageSummary createStorageSummary(final long eventId) {
+        return new StorageSummary(eventId, "1.prov", "1", 1, 2L, 2L);
+    }
+
+
+    @Test(timeout = 5000)
+    public void addThenQueryWithEmptyQuery() throws InterruptedException {
+        final RepositoryConfiguration repoConfig = createConfig();
+        final IndexManager indexManager = new SimpleIndexManager(repoConfig);
+
+        final LuceneEventIndex index = new LuceneEventIndex(repoConfig, 
indexManager, 1, EventReporter.NO_OP);
+
+        final ProvenanceEventRecord event = createEvent();
+
+        index.addEvent(event, new StorageSummary(event.getEventId(), "1.prov", 
"1", 1, 2L, 2L));
+
+        final Query query = new Query(UUID.randomUUID().toString());
+
+        final ArrayListEventStore eventStore = new ArrayListEventStore();
+        eventStore.addEvent(event);
+        index.initialize(eventStore);
+
+        // We don't know how long it will take for the event to be indexed, so 
keep querying until
+        // we get a result. The test will timeout after 5 seconds if we've 
still not succeeded.
+        List<ProvenanceEventRecord> matchingEvents = Collections.emptyList();
+        while (matchingEvents.isEmpty()) {
+            final QuerySubmission submission = index.submitQuery(query, 
EventAuthorizer.GRANT_ALL, "unit test user");
+            assertNotNull(submission);
+
+            final QueryResult result = submission.getResult();
+            assertNotNull(result);
+            result.awaitCompletion(100, TimeUnit.MILLISECONDS);
+
+            assertTrue(result.isFinished());
+            assertNull(result.getError());
+
+            matchingEvents = result.getMatchingEvents();
+            assertNotNull(matchingEvents);
+            Thread.sleep(100L); // avoid crushing the CPU
+        }
+
+        assertEquals(1, matchingEvents.size());
+        assertEquals(event, matchingEvents.get(0));
+    }
+
+    @Test(timeout = 50000)
+    public void testQuerySpecificField() throws InterruptedException {
+        final RepositoryConfiguration repoConfig = createConfig();
+        final IndexManager indexManager = new SimpleIndexManager(repoConfig);
+
+        final LuceneEventIndex index = new LuceneEventIndex(repoConfig, 
indexManager, 2, EventReporter.NO_OP);
+
+        // add 2 events, one of which we will query for.
+        final ProvenanceEventRecord event = createEvent();
+        index.addEvent(event, new StorageSummary(event.getEventId(), "1.prov", 
"1", 1, 2L, 2L));
+        index.addEvent(createEvent(), new StorageSummary(2L, "1.prov", "1", 1, 
2L, 2L));
+
+        // Create a query that searches for the event with the FlowFile UUID 
equal to the first event's.
+        final Query query = new Query(UUID.randomUUID().toString());
+        
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, 
event.getFlowFileUuid()));
+
+        final ArrayListEventStore eventStore = new ArrayListEventStore();
+        eventStore.addEvent(event);
+        index.initialize(eventStore);
+
+        // We don't know how long it will take for the event to be indexed, so 
keep querying until
+        // we get a result. The test will timeout after 5 seconds if we've 
still not succeeded.
+        List<ProvenanceEventRecord> matchingEvents = Collections.emptyList();
+        while (matchingEvents.isEmpty()) {
+            final QuerySubmission submission = index.submitQuery(query, 
EventAuthorizer.GRANT_ALL, "unit test user");
+            assertNotNull(submission);
+
+            final QueryResult result = submission.getResult();
+            assertNotNull(result);
+            result.awaitCompletion(100, TimeUnit.MILLISECONDS);
+
+            assertTrue(result.isFinished());
+            assertNull(result.getError());
+
+            matchingEvents = result.getMatchingEvents();
+            assertNotNull(matchingEvents);
+            Thread.sleep(100L); // avoid crushing the CPU
+        }
+
+        assertEquals(1, matchingEvents.size());
+        assertEquals(event, matchingEvents.get(0));
+    }
+
+    private RepositoryConfiguration createConfig() {
+        return createConfig(1);
+    }
+
+    private RepositoryConfiguration createConfig(final int 
storageDirectoryCount) {
+        final RepositoryConfiguration config = new RepositoryConfiguration();
+        final String unitTestName = testName.getMethodName();
+        final File storageDir = new File("target/storage/" + unitTestName + 
"/" + UUID.randomUUID().toString());
+
+        for (int i = 0; i < storageDirectoryCount; i++) {
+            config.addStorageDirectory(String.valueOf(i + 1), new 
File(storageDir, String.valueOf(i)));
+        }
+
+        
config.setSearchableFields(Collections.singletonList(SearchableFields.FlowFileUUID));
+        
config.setSearchableAttributes(Collections.singletonList(SearchableFields.newSearchableAttribute("updated")));
+
+        for (final File file : config.getStorageDirectories().values()) {
+            assertTrue(file.exists() || file.mkdirs());
+        }
+
+        return config;
+    }
+
+    private ProvenanceEventRecord createEvent() {
+        return createEvent(System.currentTimeMillis());
+    }
+
+    private ProvenanceEventRecord createEvent(final String uuid) {
+        return createEvent(System.currentTimeMillis(), uuid);
+    }
+
+    private ProvenanceEventRecord createEvent(final long timestamp) {
+        return createEvent(timestamp, UUID.randomUUID().toString());
+    }
+
+    private ProvenanceEventRecord createEvent(final long timestamp, final 
String uuid) {
+        final Map<String, String> previousAttributes = new HashMap<>();
+        previousAttributes.put("uuid", uuid);
+        final Map<String, String> updatedAttributes = new HashMap<>();
+        updatedAttributes.put("updated", "true");
+
+        final ProvenanceEventRecord event = new 
StandardProvenanceEventRecord.Builder()
+            .setEventType(ProvenanceEventType.CONTENT_MODIFIED)
+            .setAttributes(previousAttributes, updatedAttributes)
+            .setComponentId("component-1")
+            .setComponentType("unit test")
+            .setEventId(idGenerator.getAndIncrement())
+            .setEventTime(timestamp)
+            .setFlowFileEntryDate(timestamp)
+            .setFlowFileUUID(uuid)
+            .setLineageStartDate(timestamp)
+            .setCurrentContentClaim("container", "section", "unit-test-id", 
0L, 1024L)
+            .build();
+
+        return event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
index 36f0b00..a42b73a 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
@@ -29,14 +29,14 @@ import java.util.UUID;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanClause.Occur;
 import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.index.EventIndexWriter;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -67,47 +67,47 @@ public class TestCachingIndexManager {
     public void test() throws IOException {
         // Create and IndexWriter and add a document to the index, then close 
the writer.
         // This gives us something that we can query.
-        final IndexWriter writer = manager.borrowIndexWriter(indexDir);
+        final EventIndexWriter writer = manager.borrowIndexWriter(indexDir);
         final Document doc = new Document();
         doc.add(new StringField("unit test", "true", Store.YES));
-        writer.addDocument(doc);
-        manager.returnIndexWriter(indexDir, writer);
+        writer.index(doc, 1000);
+        manager.returnIndexWriter(writer);
 
         // Get an Index Searcher that we can use to query the index.
-        final IndexSearcher cachedSearcher = 
manager.borrowIndexSearcher(indexDir);
+        final EventIndexSearcher cachedSearcher = 
manager.borrowIndexSearcher(indexDir);
 
         // Ensure that we get the expected results.
         assertCount(cachedSearcher, 1);
 
         // While we already have an Index Searcher, get a writer for the same 
index.
         // This will cause the Index Searcher to be marked as poisoned.
-        final IndexWriter writer2 = manager.borrowIndexWriter(indexDir);
+        final EventIndexWriter writer2 = manager.borrowIndexWriter(indexDir);
 
         // Obtain a new Index Searcher with the writer open. This Index 
Searcher should *NOT*
         // be the same as the previous searcher because the new one will be a 
Near-Real-Time Index Searcher
         // while the other is not.
-        final IndexSearcher nrtSearcher = 
manager.borrowIndexSearcher(indexDir);
+        final EventIndexSearcher nrtSearcher = 
manager.borrowIndexSearcher(indexDir);
         assertNotSame(cachedSearcher, nrtSearcher);
 
         // Ensure that we get the expected query results.
         assertCount(nrtSearcher, 1);
 
         // Return the writer, so that there is no longer an active writer for 
the index.
-        manager.returnIndexWriter(indexDir, writer2);
+        manager.returnIndexWriter(writer2);
 
         // Ensure that we still get the same result.
         assertCount(cachedSearcher, 1);
-        manager.returnIndexSearcher(indexDir, cachedSearcher);
+        manager.returnIndexSearcher(cachedSearcher);
 
         // Ensure that our near-real-time index searcher still gets the same 
result.
         assertCount(nrtSearcher, 1);
-        manager.returnIndexSearcher(indexDir, nrtSearcher);
+        manager.returnIndexSearcher(nrtSearcher);
     }
 
-    private void assertCount(final IndexSearcher searcher, final int count) 
throws IOException {
+    private void assertCount(final EventIndexSearcher searcher, final int 
count) throws IOException {
         final BooleanQuery query = new BooleanQuery();
         query.add(new BooleanClause(new TermQuery(new Term("unit test", 
"true")), Occur.MUST));
-        final TopDocs topDocs = searcher.search(query, count * 10);
+        final TopDocs topDocs = searcher.getIndexSearcher().search(query, 
count * 10);
         assertNotNull(topDocs);
         assertEquals(1, topDocs.totalHits);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
index 834177f..05369ca 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
@@ -18,18 +18,21 @@
 package org.apache.nifi.provenance.lucene;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.TopDocs;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.index.EventIndexWriter;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -40,14 +43,13 @@ public class TestSimpleIndexManager {
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", 
"DEBUG");
     }
 
-
     @Test
     public void testMultipleWritersSimultaneouslySameIndex() throws 
IOException {
-        final SimpleIndexManager mgr = new SimpleIndexManager();
+        final SimpleIndexManager mgr = new SimpleIndexManager(new 
RepositoryConfiguration());
         final File dir = new File("target/" + UUID.randomUUID().toString());
         try {
-            final IndexWriter writer1 = mgr.borrowIndexWriter(dir);
-            final IndexWriter writer2 = mgr.borrowIndexWriter(dir);
+            final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
+            final EventIndexWriter writer2 = mgr.borrowIndexWriter(dir);
 
             final Document doc1 = new Document();
             doc1.add(new StringField("id", "1", Store.YES));
@@ -55,18 +57,94 @@ public class TestSimpleIndexManager {
             final Document doc2 = new Document();
             doc2.add(new StringField("id", "2", Store.YES));
 
-            writer1.addDocument(doc1);
-            writer2.addDocument(doc2);
-            mgr.returnIndexWriter(dir, writer2);
-            mgr.returnIndexWriter(dir, writer1);
+            writer1.index(doc1, 1000);
+            writer2.index(doc2, 1000);
+            mgr.returnIndexWriter(writer2);
+            mgr.returnIndexWriter(writer1);
 
-            final IndexSearcher searcher = mgr.borrowIndexSearcher(dir);
-            final TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 
2);
+            final EventIndexSearcher searcher = mgr.borrowIndexSearcher(dir);
+            final TopDocs topDocs = searcher.getIndexSearcher().search(new 
MatchAllDocsQuery(), 2);
             assertEquals(2, topDocs.totalHits);
-            mgr.returnIndexSearcher(dir, searcher);
+            mgr.returnIndexSearcher(searcher);
         } finally {
             FileUtils.deleteFile(dir, true);
         }
     }
 
+    @Test
+    public void testWriterCloseIfPreviouslyMarkedCloseable() throws 
IOException {
+        final AtomicInteger closeCount = new AtomicInteger(0);
+
+        final SimpleIndexManager mgr = new SimpleIndexManager(new 
RepositoryConfiguration()) {
+            @Override
+            protected void close(IndexWriterCount count) throws IOException {
+                closeCount.incrementAndGet();
+            }
+        };
+
+        final File dir = new File("target/" + UUID.randomUUID().toString());
+
+        final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
+        final EventIndexWriter writer2 = mgr.borrowIndexWriter(dir);
+        assertTrue(writer1 == writer2);
+
+        mgr.returnIndexWriter(writer1, true, true);
+        assertEquals(0, closeCount.get());
+
+        final EventIndexWriter[] writers = new EventIndexWriter[10];
+        for (int i = 0; i < writers.length; i++) {
+            writers[i] = mgr.borrowIndexWriter(dir);
+            assertTrue(writers[i] == writer1);
+        }
+
+        for (int i = 0; i < writers.length; i++) {
+            mgr.returnIndexWriter(writers[i], true, false);
+            assertEquals(0, closeCount.get());
+            assertEquals(1, mgr.getWriterCount());
+        }
+
+        // this should close the index writer even though 'false' is passed in
+        // because the previous call marked the writer as closeable and this is
+        // the last reference to the writer.
+        mgr.returnIndexWriter(writer2, false, false);
+        assertEquals(1, closeCount.get());
+        assertEquals(0, mgr.getWriterCount());
+    }
+
+    @Test
+    public void testWriterCloseIfOnlyUser() throws IOException {
+        final AtomicInteger closeCount = new AtomicInteger(0);
+
+        final SimpleIndexManager mgr = new SimpleIndexManager(new 
RepositoryConfiguration()) {
+            @Override
+            protected void close(IndexWriterCount count) throws IOException {
+                closeCount.incrementAndGet();
+            }
+        };
+
+        final File dir = new File("target/" + UUID.randomUUID().toString());
+
+        final EventIndexWriter writer = mgr.borrowIndexWriter(dir);
+        mgr.returnIndexWriter(writer, true, true);
+        assertEquals(1, closeCount.get());
+    }
+
+    @Test
+    public void testWriterLeftOpenIfNotCloseable() throws IOException {
+        final AtomicInteger closeCount = new AtomicInteger(0);
+
+        final SimpleIndexManager mgr = new SimpleIndexManager(new 
RepositoryConfiguration()) {
+            @Override
+            protected void close(IndexWriterCount count) throws IOException {
+                closeCount.incrementAndGet();
+            }
+        };
+
+        final File dir = new File("target/" + UUID.randomUUID().toString());
+
+        final EventIndexWriter writer = mgr.borrowIndexWriter(dir);
+        mgr.returnIndexWriter(writer, true, false);
+        assertEquals(0, closeCount.get());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/ArrayListEventStore.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/ArrayListEventStore.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/ArrayListEventStore.java
new file mode 100644
index 0000000..94a3699
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/ArrayListEventStore.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.authorization.EventTransformer;
+import org.apache.nifi.provenance.index.EventIndex;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArrayListEventStore implements EventStore {
+    private static final Logger logger = 
LoggerFactory.getLogger(ArrayListEventStore.class);
+
+    private final List<ProvenanceEventRecord> events = new ArrayList<>();
+    private final AtomicLong idGenerator = new AtomicLong(0L);
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void initialize() throws IOException {
+    }
+
+    public StorageResult addEvent(final ProvenanceEventRecord event) {
+        return addEvents(Collections.singleton(event));
+    }
+
+    @Override
+    public synchronized StorageResult 
addEvents(Iterable<ProvenanceEventRecord> events) {
+        final Map<ProvenanceEventRecord, StorageSummary> storageLocations = 
new HashMap<>();
+
+        for (final ProvenanceEventRecord event : events) {
+            this.events.add(event);
+
+            final StorageSummary storageSummary = new 
StorageSummary(idGenerator.getAndIncrement(), "location", "1", 1, 0L, 0L);
+            storageLocations.put(event, storageSummary);
+        }
+
+        return new StorageResult() {
+            @Override
+            public Map<ProvenanceEventRecord, StorageSummary> 
getStorageLocations() {
+                return storageLocations;
+            }
+
+            @Override
+            public boolean triggeredRollover() {
+                return false;
+            }
+
+            @Override
+            public Integer getEventsRolledOver() {
+                return null;
+            }
+        };
+    }
+
+    @Override
+    public long getSize() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public long getMaxEventId() {
+        return idGenerator.get() - 1;
+    }
+
+    @Override
+    public synchronized Optional<ProvenanceEventRecord> getEvent(long id) 
throws IOException {
+        if (events.size() <= id) {
+            return Optional.empty();
+        }
+
+        return Optional.ofNullable(events.get((int) id));
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(long firstRecordId, int 
maxResults) throws IOException {
+        return getEvents(firstRecordId, maxResults, EventAuthorizer.GRANT_ALL, 
EventTransformer.EMPTY_TRANSFORMER);
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(long firstRecordId, int 
maxResults, EventAuthorizer authorizer, EventTransformer transformer) throws 
IOException {
+        final List<ProvenanceEventRecord> events = new ArrayList<>();
+        for (int i = 0; i < maxResults; i++) {
+            final Optional<ProvenanceEventRecord> eventOption = 
getEvent(firstRecordId + i);
+            if (!eventOption.isPresent()) {
+                break;
+            }
+
+            events.add(eventOption.get());
+        }
+
+        return events;
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final List<Long> eventIds, 
final EventAuthorizer authorizer, final EventTransformer transformer) {
+        final List<ProvenanceEventRecord> events = new ArrayList<>();
+        for (final Long eventId : eventIds) {
+            final Optional<ProvenanceEventRecord> eventOption;
+            try {
+                eventOption = getEvent(eventId);
+            } catch (final Exception e) {
+                logger.warn("Failed to retrieve event with ID " + eventId, e);
+                continue;
+            }
+
+            if (!eventOption.isPresent()) {
+                continue;
+            }
+
+            if (authorizer.isAuthorized(eventOption.get())) {
+                events.add(eventOption.get());
+            } else {
+                final Optional<ProvenanceEventRecord> transformedOption = 
transformer.transform(eventOption.get());
+                if (transformedOption.isPresent()) {
+                    events.add(transformedOption.get());
+                }
+            }
+        }
+
+        return events;
+    }
+
+    @Override
+    public void reindexLatestEvents(EventIndex eventIndex) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestEventFileManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestEventFileManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestEventFileManager.java
new file mode 100644
index 0000000..42b8be2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestEventFileManager.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+public class TestEventFileManager {
+
+    @Test(timeout = 5000)
+    public void testTwoWriteLocks() throws InterruptedException {
+        final EventFileManager fileManager = new EventFileManager();
+        final File f1 = new File("1.prov");
+        final File gz = new File("1.prov.gz");
+
+        final AtomicBoolean obtained = new AtomicBoolean(false);
+
+        final Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                fileManager.obtainWriteLock(f1);
+
+                synchronized (obtained) {
+                    obtained.set(true);
+                    obtained.notify();
+                }
+
+                try {
+                    Thread.sleep(500L);
+                } catch (InterruptedException e) {
+                }
+                fileManager.releaseWriteLock(f1);
+            }
+        });
+
+        t1.start();
+
+        final Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                synchronized (obtained) {
+                    while (!obtained.get()) {
+                        try {
+                            obtained.wait();
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                }
+
+                fileManager.obtainWriteLock(gz);
+                fileManager.releaseWriteLock(gz);
+            }
+        });
+
+        final long start = System.nanoTime();
+        t2.start();
+        t2.join();
+        final long nanos = System.nanoTime() - start;
+        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(300L));
+    }
+
+
+    @Test(timeout = 5000)
+    public void testTwoReadLocks() throws InterruptedException {
+        final EventFileManager fileManager = new EventFileManager();
+        final File f1 = new File("1.prov");
+        final File gz = new File("1.prov.gz");
+
+        final AtomicBoolean obtained = new AtomicBoolean(false);
+
+        final Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                fileManager.obtainReadLock(f1);
+
+                synchronized (obtained) {
+                    obtained.set(true);
+                    obtained.notify();
+                }
+
+                try {
+                    Thread.sleep(100000L);
+                } catch (InterruptedException e) {
+                }
+                fileManager.releaseReadLock(f1);
+            }
+        });
+
+        t1.start();
+
+        final Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                synchronized (obtained) {
+                    while (!obtained.get()) {
+                        try {
+                            obtained.wait();
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                }
+
+                fileManager.obtainReadLock(gz);
+                fileManager.releaseReadLock(gz);
+            }
+        });
+
+        final long start = System.nanoTime();
+        t2.start();
+        t2.join();
+        final long nanos = System.nanoTime() - start;
+        assertTrue(nanos < TimeUnit.MILLISECONDS.toNanos(500L));
+    }
+
+
+    @Test(timeout = 5000)
+    public void testWriteThenRead() throws InterruptedException {
+        final EventFileManager fileManager = new EventFileManager();
+        final File f1 = new File("1.prov");
+        final File gz = new File("1.prov.gz");
+
+        final AtomicBoolean obtained = new AtomicBoolean(false);
+
+        final Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                fileManager.obtainWriteLock(f1);
+
+                synchronized (obtained) {
+                    obtained.set(true);
+                    obtained.notify();
+                }
+
+                try {
+                    Thread.sleep(500L);
+                } catch (InterruptedException e) {
+                }
+                fileManager.releaseWriteLock(f1);
+            }
+        });
+
+        t1.start();
+
+        final Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                synchronized (obtained) {
+                    while (!obtained.get()) {
+                        try {
+                            obtained.wait();
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                }
+
+                fileManager.obtainReadLock(gz);
+                fileManager.releaseReadLock(gz);
+            }
+        });
+
+        final long start = System.nanoTime();
+        t2.start();
+        t2.join();
+        final long nanos = System.nanoTime() - start;
+        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(300L));
+    }
+
+
+    @Test(timeout = 5000)
+    public void testReadThenWrite() throws InterruptedException {
+        final EventFileManager fileManager = new EventFileManager();
+        final File f1 = new File("1.prov");
+        final File gz = new File("1.prov.gz");
+
+        final AtomicBoolean obtained = new AtomicBoolean(false);
+
+        final Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                fileManager.obtainReadLock(f1);
+
+                synchronized (obtained) {
+                    obtained.set(true);
+                    obtained.notify();
+                }
+
+                try {
+                    Thread.sleep(500L);
+                } catch (InterruptedException e) {
+                }
+                fileManager.releaseReadLock(f1);
+            }
+        });
+
+        t1.start();
+
+        final Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                synchronized (obtained) {
+                    while (!obtained.get()) {
+                        try {
+                            obtained.wait();
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                }
+
+                fileManager.obtainWriteLock(gz);
+                fileManager.releaseWriteLock(gz);
+            }
+        });
+
+        final long start = System.nanoTime();
+        t2.start();
+        t2.join();
+        final long nanos = System.nanoTime() - start;
+        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(300L));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java
new file mode 100644
index 0000000..7c5e43b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.authorization.EventTransformer;
+import org.apache.nifi.provenance.serialization.RecordReaders;
+import org.apache.nifi.provenance.serialization.RecordWriters;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class TestPartitionedWriteAheadEventStore {
+    private static final RecordWriterFactory writerFactory = (file, idGen, 
compress, createToc) -> RecordWriters.newSchemaRecordWriter(file, idGen, 
compress, createToc);
+    private static final RecordReaderFactory readerFactory = (file, logs, 
maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars);
+
+    private final AtomicLong idGenerator = new AtomicLong(0L);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void resetIds() {
+        idGenerator.set(0L);
+    }
+
+
+    @Test
+    @Ignore
+    public void testPerformanceOfAccessingEvents() throws Exception {
+        final RecordWriterFactory recordWriterFactory = (file, idGenerator, 
compressed, createToc) -> {
+            final TocWriter tocWriter = createToc ? new 
StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+            return new EventIdFirstSchemaRecordWriter(file, idGenerator, 
tocWriter, compressed, 1024 * 1024, IdentifierLookup.EMPTY);
+        };
+
+        final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) 
-> RecordReaders.newRecordReader(file, logs, maxChars);
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(createConfig(),
+            recordWriterFactory, recordReaderFactory, EventReporter.NO_OP, new 
EventFileManager());
+        store.initialize();
+
+        assertEquals(-1, store.getMaxEventId());
+        for (int i = 0; i < 100_000; i++) {
+            final ProvenanceEventRecord event1 = createEvent();
+            store.addEvents(Collections.singleton(event1));
+        }
+
+        final List<Long> eventIdList = Arrays.asList(4L, 80L, 1024L, 40_000L, 
80_000L, 99_000L);
+
+        while (true) {
+            for (int i = 0; i < 100; i++) {
+                time(() -> store.getEvents(eventIdList, 
EventAuthorizer.GRANT_ALL, EventTransformer.EMPTY_TRANSFORMER), "Fetch Events");
+            }
+
+            Thread.sleep(1000L);
+        }
+    }
+
+    private void time(final Callable<?> task, final String taskDescription) 
throws Exception {
+        final long start = System.nanoTime();
+        task.call();
+        final long nanos = System.nanoTime() - start;
+        final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+        System.out.println("Took " + millis + " ms to " + taskDescription);
+    }
+
+    @Test
+    public void testSingleWriteThenRead() throws IOException {
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(createConfig(), writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        assertEquals(-1, store.getMaxEventId());
+        final ProvenanceEventRecord event1 = createEvent();
+        final StorageResult result = 
store.addEvents(Collections.singleton(event1));
+
+        final StorageSummary summary = 
result.getStorageLocations().values().iterator().next();
+        final long eventId = summary.getEventId();
+        final ProvenanceEventRecord eventWithId = addId(event1, eventId);
+
+        assertEquals(0, store.getMaxEventId());
+
+        final ProvenanceEventRecord read = store.getEvent(eventId).get();
+        assertEquals(eventWithId, read);
+    }
+
+    @Test
+    public void testMultipleWritesThenReads() throws IOException {
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(createConfig(), writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+        assertEquals(-1, store.getMaxEventId());
+
+        final int numEvents = 20;
+        final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents);
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+            assertEquals(i, store.getMaxEventId());
+
+            events.add(event);
+        }
+
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord read = store.getEvent(i).get();
+            assertEquals(events.get(i), read);
+        }
+    }
+
+
+    @Test()
+    public void testMultipleWritesThenGetAllInSingleRead() throws IOException {
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(createConfig(), writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+        assertEquals(-1, store.getMaxEventId());
+
+        final int numEvents = 20;
+        final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents);
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+            assertEquals(i, store.getMaxEventId());
+
+            events.add(event);
+        }
+
+        List<ProvenanceEventRecord> eventsRead = store.getEvents(0L, 
numEvents, null, EventTransformer.EMPTY_TRANSFORMER);
+        assertNotNull(eventsRead);
+
+        assertEquals(numEvents, eventsRead.size());
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord read = eventsRead.get(i);
+            assertEquals(events.get(i), read);
+        }
+
+        eventsRead = store.getEvents(-1000, 1000, null, 
EventTransformer.EMPTY_TRANSFORMER);
+        assertNotNull(eventsRead);
+        assertTrue(eventsRead.isEmpty());
+
+        eventsRead = store.getEvents(10, 0, null, 
EventTransformer.EMPTY_TRANSFORMER);
+        assertNotNull(eventsRead);
+        assertTrue(eventsRead.isEmpty());
+
+        eventsRead = store.getEvents(10, 1, null, 
EventTransformer.EMPTY_TRANSFORMER);
+        assertNotNull(eventsRead);
+        assertFalse(eventsRead.isEmpty());
+        assertEquals(1, eventsRead.size());
+        assertEquals(events.get(10), eventsRead.get(0));
+
+        eventsRead = store.getEvents(20, 1000, null, 
EventTransformer.EMPTY_TRANSFORMER);
+        assertNotNull(eventsRead);
+        assertTrue(eventsRead.isEmpty());
+    }
+
+    @Test
+    public void testGetSize() throws IOException {
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(createConfig(), writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        long storeSize = 0L;
+        final int numEvents = 20;
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+            final long newSize = store.getSize();
+            assertTrue(newSize > storeSize);
+            storeSize = newSize;
+        }
+    }
+
+    @Test
+    public void testMaxEventIdRestored() throws IOException {
+        final RepositoryConfiguration config = createConfig();
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        final int numEvents = 20;
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+        }
+
+        assertEquals(19, store.getMaxEventId());
+        store.close();
+
+        final PartitionedWriteAheadEventStore recoveredStore = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        recoveredStore.initialize();
+        assertEquals(19, recoveredStore.getMaxEventId());
+    }
+
+    @Test
+    public void testGetEvent() throws IOException {
+        final RepositoryConfiguration config = createConfig();
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        final int numEvents = 20;
+        final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents);
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+            events.add(event);
+        }
+
+        // Ensure that each event is retrieved successfully.
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = store.getEvent(i).get();
+            assertEquals(events.get(i), event);
+        }
+
+        assertFalse(store.getEvent(-1L).isPresent());
+        assertFalse(store.getEvent(20L).isPresent());
+    }
+
+    @Test
+    public void testGetEventsWithMinIdAndCount() throws IOException {
+        final RepositoryConfiguration config = createConfig();
+        config.setMaxEventFileCount(100);
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        final int numEvents = 50_000;
+        final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents);
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+            if (i < 1000) {
+                events.add(event);
+            }
+        }
+
+        assertTrue(store.getEvents(-1000L, 1000).isEmpty());
+        assertEquals(events, store.getEvents(0, events.size()));
+        assertEquals(events, store.getEvents(-30, events.size()));
+        assertEquals(events.subList(10, events.size()), store.getEvents(10L, 
events.size() - 10));
+        assertTrue(store.getEvents(numEvents, 100).isEmpty());
+    }
+
+    @Test
+    public void testGetEventsWithMinIdAndCountWithAuthorizer() throws 
IOException {
+        final RepositoryConfiguration config = createConfig();
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        final int numEvents = 20;
+        final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents);
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+            events.add(event);
+        }
+
+        final EventAuthorizer allowEventNumberedEventIds = new 
EventAuthorizer() {
+            @Override
+            public boolean isAuthorized(final ProvenanceEventRecord event) {
+                return event.getEventId() % 2 == 0L;
+            }
+
+            @Override
+            public void authorize(ProvenanceEventRecord event) throws 
AccessDeniedException {
+                if (!isAuthorized(event)) {
+                    throw new AccessDeniedException();
+                }
+            }
+        };
+
+        final List<ProvenanceEventRecord> storedEvents = store.getEvents(0, 
20, allowEventNumberedEventIds, EventTransformer.EMPTY_TRANSFORMER);
+        assertEquals(numEvents / 2, storedEvents.size());
+        for (int i = 0; i < storedEvents.size(); i++) {
+            assertEquals(events.get(i * 2), storedEvents.get(i));
+        }
+    }
+
+
+    @Test
+    public void testGetEventsWithStartOffsetAndCountWithNothingAuthorized() 
throws IOException {
+        final RepositoryConfiguration config = createConfig();
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        final int numEvents = 20;
+        final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents);
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+            events.add(event);
+        }
+
+        final EventAuthorizer allowEventNumberedEventIds = 
EventAuthorizer.DENY_ALL;
+        final List<ProvenanceEventRecord> storedEvents = store.getEvents(0, 
20, allowEventNumberedEventIds, EventTransformer.EMPTY_TRANSFORMER);
+        assertTrue(storedEvents.isEmpty());
+    }
+
+    @Test
+    public void testGetSpecificEventIds() throws IOException {
+        final RepositoryConfiguration config = createConfig();
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        final int numEvents = 20;
+        final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents);
+        for (int i = 0; i < numEvents; i++) {
+            final ProvenanceEventRecord event = createEvent();
+            store.addEvents(Collections.singleton(event));
+            events.add(event);
+        }
+
+        final EventAuthorizer allowEvenNumberedEventIds = new 
EventAuthorizer() {
+            @Override
+            public boolean isAuthorized(final ProvenanceEventRecord event) {
+                return event.getEventId() % 2 == 0L;
+            }
+
+            @Override
+            public void authorize(ProvenanceEventRecord event) throws 
AccessDeniedException {
+                if (!isAuthorized(event)) {
+                    throw new AccessDeniedException();
+                }
+            }
+        };
+
+        final List<Long> evenEventIds = new ArrayList<>();
+        final List<Long> oddEventIds = new ArrayList<>();
+        final List<Long> allEventIds = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            final Long id = Long.valueOf(i);
+            allEventIds.add(id);
+
+            if (i % 2 == 0) {
+                evenEventIds.add(id);
+            } else {
+                oddEventIds.add(id);
+            }
+        }
+
+        final List<ProvenanceEventRecord> storedEvents = 
store.getEvents(evenEventIds, allowEvenNumberedEventIds, 
EventTransformer.EMPTY_TRANSFORMER);
+        assertEquals(numEvents / 2, storedEvents.size());
+        for (int i = 0; i < storedEvents.size(); i++) {
+            assertEquals(events.get(i * 2), storedEvents.get(i));
+        }
+
+        assertTrue(store.getEvents(oddEventIds, allowEvenNumberedEventIds, 
EventTransformer.EMPTY_TRANSFORMER).isEmpty());
+
+        final List<ProvenanceEventRecord> allStoredEvents = 
store.getEvents(allEventIds, EventAuthorizer.GRANT_ALL, 
EventTransformer.EMPTY_TRANSFORMER);
+        assertEquals(events, allStoredEvents);
+    }
+
+
+    @Test
+    public void testWriteAfterRecoveringRepo() throws IOException {
+        final RepositoryConfiguration config = createConfig();
+        final PartitionedWriteAheadEventStore store = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        store.initialize();
+
+        for (int i = 0; i < 4; i++) {
+            store.addEvents(Collections.singleton(createEvent()));
+        }
+
+        store.close();
+
+        final PartitionedWriteAheadEventStore recoveredStore = new 
PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, 
EventReporter.NO_OP, new EventFileManager());
+        recoveredStore.initialize();
+
+        List<ProvenanceEventRecord> recoveredEvents = 
recoveredStore.getEvents(0, 10);
+        assertEquals(4, recoveredEvents.size());
+
+        // ensure that we can still write to the store
+        for (int i = 0; i < 4; i++) {
+            recoveredStore.addEvents(Collections.singleton(createEvent()));
+        }
+
+        recoveredEvents = recoveredStore.getEvents(0, 10);
+        assertEquals(8, recoveredEvents.size());
+
+        for (int i = 0; i < 8; i++) {
+            assertEquals(i, recoveredEvents.get(i).getEventId());
+        }
+    }
+
+
+    private RepositoryConfiguration createConfig() {
+        return createConfig(2);
+    }
+
+    private RepositoryConfiguration createConfig(final int numStorageDirs) {
+        final RepositoryConfiguration config = new RepositoryConfiguration();
+        final String unitTestName = testName.getMethodName();
+        final File storageDir = new File("target/storage/" + unitTestName + 
"/" + UUID.randomUUID().toString());
+
+        for (int i = 1; i <= numStorageDirs; i++) {
+            config.addStorageDirectory(String.valueOf(i), new File(storageDir, 
String.valueOf(i)));
+        }
+
+        return config;
+    }
+
+    private ProvenanceEventRecord addId(final ProvenanceEventRecord event, 
final long eventId) {
+        return new StandardProvenanceEventRecord.Builder()
+            .fromEvent(event)
+            .setEventId(eventId)
+            .build();
+    }
+
+
+    private ProvenanceEventRecord createEvent() {
+        final String uuid = UUID.randomUUID().toString();
+        final Map<String, String> previousAttributes = new HashMap<>();
+        previousAttributes.put("uuid", uuid);
+        final Map<String, String> updatedAttributes = new HashMap<>();
+        updatedAttributes.put("updated", "true");
+
+        return new StandardProvenanceEventRecord.Builder()
+            .setEventType(ProvenanceEventType.CONTENT_MODIFIED)
+            .setAttributes(previousAttributes, updatedAttributes)
+            .setComponentId("component-1")
+            .setComponentType("unit test")
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(System.currentTimeMillis())
+            .setFlowFileUUID(uuid)
+            .setLineageStartDate(System.currentTimeMillis())
+            .setCurrentContentClaim("container", "section", "unit-test-id", 
0L, 1024L)
+            .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java
new file mode 100644
index 0000000..3879411
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.TestUtil;
+import org.apache.nifi.provenance.index.EventIndex;
+import org.apache.nifi.provenance.serialization.RecordReaders;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestWriteAheadStorePartition {
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testReindex() throws IOException {
+        final RepositoryConfiguration repoConfig = createConfig(1, 
"testReindex");
+        repoConfig.setMaxEventFileCount(5);
+
+        final String partitionName = 
repoConfig.getStorageDirectories().keySet().iterator().next();
+        final File storageDirectory = 
repoConfig.getStorageDirectories().values().iterator().next();
+
+        final RecordWriterFactory recordWriterFactory = (file, idGenerator, 
compressed, createToc) -> {
+            final TocWriter tocWriter = createToc ? new 
StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+            return new EventIdFirstSchemaRecordWriter(file, idGenerator, 
tocWriter, compressed, 32 * 1024, IdentifierLookup.EMPTY);
+        };
+
+        final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) 
-> RecordReaders.newRecordReader(file, logs, maxChars);
+
+        final WriteAheadStorePartition partition = new 
WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, 
recordWriterFactory,
+            recordReaderFactory, new LinkedBlockingQueue<>(), new 
AtomicLong(0L), EventReporter.NO_OP);
+
+        for (int i = 0; i < 100; i++) {
+            partition.addEvents(Collections.singleton(TestUtil.createEvent()));
+        }
+
+        final Map<ProvenanceEventRecord, StorageSummary> reindexedEvents = new 
HashMap<>();
+        final EventIndex eventIndex = Mockito.mock(EventIndex.class);
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable {
+                final Map<ProvenanceEventRecord, StorageSummary> events = 
invocation.getArgumentAt(0, Map.class);
+                reindexedEvents.putAll(events);
+                return null;
+            }
+        }).when(eventIndex).reindexEvents(Mockito.anyMap());
+
+        Mockito.doReturn(18L).when(eventIndex).getMinimumEventIdToReindex("1");
+        partition.reindexLatestEvents(eventIndex);
+
+        final List<Long> eventIdsReindexed = reindexedEvents.values().stream()
+            .map(StorageSummary::getEventId)
+            .sorted()
+            .collect(Collectors.toList());
+
+        assertEquals(82, eventIdsReindexed.size());
+        for (int i = 0; i < eventIdsReindexed.size(); i++) {
+            assertEquals(18 + i, eventIdsReindexed.get(i).intValue());
+        }
+    }
+
+    private RepositoryConfiguration createConfig(final int numStorageDirs, 
final String testName) {
+        final RepositoryConfiguration config = new RepositoryConfiguration();
+        final File storageDir = new File("target/storage/" + testName + "/" + 
UUID.randomUUID().toString());
+
+        for (int i = 1; i <= numStorageDirs; i++) {
+            config.addStorageDirectory(String.valueOf(1), new File(storageDir, 
String.valueOf(i)));
+        }
+
+        config.setJournalCount(4);
+        return config;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
new file mode 100644
index 0000000..0089f61
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store.iterator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.TestUtil;
+import org.apache.nifi.provenance.serialization.RecordReaders;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.store.RecordReaderFactory;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+public class TestSelectiveRecordReaderEventIterator {
+
+
+    private RecordWriter createWriter(final File file, final TocWriter 
tocWriter, final boolean compressed, final int uncompressedBlockSize) throws 
IOException {
+        return new EventIdFirstSchemaRecordWriter(file, new AtomicLong(0L), 
tocWriter, compressed, uncompressedBlockSize, IdentifierLookup.EMPTY);
+    }
+
+    @Test
+    public void testFilterUnneededFiles() {
+        final File file1 = new File("1.prov");
+        final File file1000 = new File("1000.prov");
+        final File file2000 = new File("2000.prov");
+        final File file3000 = new File("3000.prov");
+
+        // Filter out the first file.
+        final List<File> files = new ArrayList<>();
+        files.add(file1);
+        files.add(file1000);
+        files.add(file2000);
+        files.add(file3000);
+
+        List<Long> eventIds = new ArrayList<>();
+        eventIds.add(1048L);
+        eventIds.add(2048L);
+        eventIds.add(3048L);
+
+        List<File> filteredFiles = 
SelectiveRecordReaderEventIterator.filterUnneededFiles(files, eventIds);
+        assertEquals(Arrays.asList(new File[] {file1000, file2000, file3000}), 
filteredFiles);
+
+        // Filter out file at end
+        eventIds.clear();
+        eventIds.add(1L);
+        eventIds.add(1048L);
+
+        filteredFiles = 
SelectiveRecordReaderEventIterator.filterUnneededFiles(files, eventIds);
+        assertEquals(Arrays.asList(new File[] {file1, file1000}), 
filteredFiles);
+    }
+
+    @Test
+    @Ignore("For local testing only. Runs indefinitely")
+    public void testPerformanceOfRandomAccessReads() throws Exception {
+        final File dir = new File("target/storage/" + 
UUID.randomUUID().toString());
+        final File journalFile = new File(dir, "/4.prov.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+
+        final int blockSize = 1024 * 32;
+        try (final RecordWriter writer = createWriter(journalFile, new 
StandardTocWriter(tocFile, true, false), true, blockSize)) {
+            writer.writeHeader(0L);
+
+            for (int i = 0; i < 100_000; i++) {
+                writer.writeRecord(TestUtil.createEvent());
+            }
+        }
+
+        final Long[] eventIds = new Long[] {
+            4L, 80L, 1024L, 1025L, 1026L, 1027L, 1028L, 1029L, 1030L, 40_000L, 
80_000L, 99_000L
+        };
+
+        final RecordReaderFactory readerFactory = (file, logs, maxChars) -> 
RecordReaders.newRecordReader(file, logs, maxChars);
+
+        final List<File> files = new ArrayList<>();
+        files.add(new File(dir, "0.prov"));
+        files.add(new File(dir, "0.prov"));
+        files.add(new File(dir, "1.prov"));
+        files.add(new File(dir, "2.prov"));
+        files.add(new File(dir, "3.prov"));
+        files.add(journalFile);
+        files.add(new File(dir, "100000000.prov"));
+
+        boolean loopForever = true;
+        while (loopForever) {
+            final long start = System.nanoTime();
+            for (int i = 0; i < 1000; i++) {
+                final SelectiveRecordReaderEventIterator iterator = new 
SelectiveRecordReaderEventIterator(
+                    Collections.singletonList(journalFile), readerFactory, 
Arrays.asList(eventIds), 32 * 1024);
+
+                for (final long id : eventIds) {
+                    time(() -> {
+                        return iterator.nextEvent().orElse(null);
+                    }, id);
+                }
+            }
+
+            final long ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+            System.out.println(ms + " ms total");
+        }
+    }
+
+    private void time(final Callable<ProvenanceEventRecord> task, final long 
id) throws Exception {
+        final long start = System.nanoTime();
+        final ProvenanceEventRecord event = task.call();
+        Assert.assertNotNull(event);
+        Assert.assertEquals(id, event.getEventId());
+        //        System.out.println(event);
+        final long nanos = System.nanoTime() - start;
+        final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+        //        System.out.println("Took " + millis + " ms to " + 
taskDescription);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index e467676..f08fed4 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -124,7 +124,8 @@ public class VolatileProvenanceRepository implements 
ProvenanceRepository {
     }
 
     @Override
-    public void initialize(final EventReporter eventReporter, final Authorizer 
authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws 
IOException {
+    public void initialize(final EventReporter eventReporter, final Authorizer 
authorizer, final ProvenanceAuthorizableFactory resourceFactory,
+        final IdentifierLookup idLookup) throws IOException {
         if (initialized.getAndSet(true)) {
             return;
         }
@@ -542,7 +543,7 @@ public class VolatileProvenanceRepository implements 
ProvenanceRepository {
         if (event == null) {
             final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.<String>emptyList(), 1, userId);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
-            
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
+            submission.getResult().update(Collections.<ProvenanceEventRecord> 
emptyList(), 0L);
             return submission;
         }
 
@@ -573,7 +574,7 @@ public class VolatileProvenanceRepository implements 
ProvenanceRepository {
         if (event == null) {
             final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.<String>emptyList(), 1, userId);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
-            
submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
+            submission.getResult().update(Collections.<ProvenanceEventRecord> 
emptyList(), 0L);
             return submission;
         }
 
@@ -681,7 +682,7 @@ public class VolatileProvenanceRepository implements 
ProvenanceRepository {
         @Override
         public void run() {
             final List<ProvenanceEventRecord> records = 
ringBuffer.getSelectedElements(filter);
-            submission.getResult().update(records);
+            submission.getResult().update(records, records.size());
         }
     }
 

Reply via email to