http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 00f4617..48d8e09 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
@@ -65,6 +66,8 @@ import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.index.EventIndexWriter;
 import org.apache.nifi.provenance.lineage.EventNode;
 import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
@@ -83,7 +86,6 @@ import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.serialization.RecordWriters;
 import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.DataOutputStream;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
@@ -120,7 +122,7 @@ public class TestPersistentProvenanceRepository {
 
     private static RepositoryConfiguration createConfiguration() {
         config = new RepositoryConfiguration();
-        config.addStorageDirectory(new File("target/storage/" + 
UUID.randomUUID().toString()));
+        config.addStorageDirectory("1", new File("target/storage/" + 
UUID.randomUUID().toString()));
         config.setCompressOnRollover(true);
         config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
         config.setCompressionBlockBytes(100);
@@ -152,14 +154,15 @@ public class TestPersistentProvenanceRepository {
         final File tempRecordFile = tempFolder.newFile("record.tmp");
         System.out.println("findJournalSizes position 0 = " + 
tempRecordFile.length());
 
-        final RecordWriter writer = 
RecordWriters.newSchemaRecordWriter(tempRecordFile, false, false);
+        final AtomicLong idGenerator = new AtomicLong(0L);
+        final RecordWriter writer = 
RecordWriters.newSchemaRecordWriter(tempRecordFile, idGenerator, false, false);
         writer.writeHeader(12345L);
         writer.flush();
         headerSize = Long.valueOf(tempRecordFile.length()).intValue();
-        writer.writeRecord(record, 12345L);
+        writer.writeRecord(record);
         writer.flush();
         recordSize = Long.valueOf(tempRecordFile.length()).intValue() - 
headerSize;
-        writer.writeRecord(record2, 23456L);
+        writer.writeRecord(record2);
         writer.flush();
         recordSize2 = Long.valueOf(tempRecordFile.length()).intValue() - 
headerSize - recordSize;
         writer.close();
@@ -187,34 +190,45 @@ public class TestPersistentProvenanceRepository {
 
     @After
     public void closeRepo() throws IOException {
-        if (repo != null) {
-            try {
-                repo.close();
-            } catch (final IOException ioe) {
-            }
+        if (repo == null) {
+            return;
         }
 
+        try {
+            repo.close();
+        } catch (final IOException ioe) {
+        }
+
+        // Delete all of the storage files. We do this in order to clean up 
the tons of files that
+        // we create but also to ensure that we have closed all of the file 
handles. If we leave any
+        // streams open, for instance, this will throw an IOException, causing 
our unit test to fail.
         if (config != null) {
-            // Delete all of the storage files. We do this in order to clean 
up the tons of files that
-            // we create but also to ensure that we have closed all of the 
file handles. If we leave any
-            // streams open, for instance, this will throw an IOException, 
causing our unit test to fail.
-            for (final File storageDir : config.getStorageDirectories()) {
-                if (storageDir.exists()) {
-                    int i;
-                    for (i = 0; i < 3; i++) {
-                        try {
-                            System.out.println("file: " + 
storageDir.toString() + " exists=" + storageDir.exists());
-                            FileUtils.deleteFile(storageDir, true);
-                            break;
-                        } catch (final IOException ioe) {
-                            // if there is a virus scanner, etc. running in 
the background we may not be able to
-                            // delete the file. Wait a sec and try again.
-                            if (i == 2) {
-                                throw ioe;
-                            } else {
-                                try {
-                                    Thread.sleep(1000L);
-                                } catch (final InterruptedException ie) {
+            for (final File storageDir : 
config.getStorageDirectories().values()) {
+                int i;
+                for (i = 0; i < 3; i++) {
+                    try {
+                        FileUtils.deleteFile(storageDir, true);
+                        break;
+                    } catch (final IOException ioe) {
+                        // if there is a virus scanner, etc. running in the 
background we may not be able to
+                        // delete the file. Wait a sec and try again.
+                        if (i == 2) {
+                            throw ioe;
+                        } else {
+                            try {
+                                System.out.println("file: " + 
storageDir.toString() + " exists=" + storageDir.exists());
+                                FileUtils.deleteFile(storageDir, true);
+                                break;
+                            } catch (final IOException ioe2) {
+                                // if there is a virus scanner, etc. running 
in the background we may not be able to
+                                // delete the file. Wait a sec and try again.
+                                if (i == 2) {
+                                    throw ioe2;
+                                } else {
+                                    try {
+                                        Thread.sleep(1000L);
+                                    } catch (final InterruptedException ie) {
+                                    }
                                 }
                             }
                         }
@@ -240,7 +254,7 @@ public class TestPersistentProvenanceRepository {
         config.setJournalCount(10);
         config.setQueryThreadPoolSize(10);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("abc", "xyz");
@@ -288,7 +302,7 @@ public class TestPersistentProvenanceRepository {
         System.out.println("Closing and re-initializing");
         repo.close();
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
         System.out.println("Re-initialized");
 
         final long fetchStart = System.nanoTime();
@@ -311,7 +325,7 @@ public class TestPersistentProvenanceRepository {
                 return "2000 millis";
             } else if 
(key.equals(NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + ".default")) {
                 createConfiguration();
-                return config.getStorageDirectories().get(0).getAbsolutePath();
+                return 
config.getStorageDirectories().values().iterator().next().getAbsolutePath();
             } else {
                 return null;
             }
@@ -340,8 +354,8 @@ public class TestPersistentProvenanceRepository {
 
     @Test
     public void constructorConfig() throws IOException {
-        RepositoryConfiguration configuration = 
createTestableRepositoryConfiguration(properties);
-        TestablePersistentProvenanceRepository tppr = new 
TestablePersistentProvenanceRepository(configuration, 20000);
+        RepositoryConfiguration configuration = 
RepositoryConfiguration.create(properties);
+        new TestablePersistentProvenanceRepository(configuration, 20000);
     }
 
     @Test
@@ -350,7 +364,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileCapacity(1L);
         config.setMaxEventFileLife(1, TimeUnit.SECONDS);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("abc", "xyz");
@@ -376,7 +390,7 @@ public class TestPersistentProvenanceRepository {
         Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all 
file handles, etc.)
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
         final List<ProvenanceEventRecord> recoveredRecords = 
repo.getEvents(0L, 12);
 
         assertEquals(10, recoveredRecords.size());
@@ -399,7 +413,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(2, TimeUnit.SECONDS);
         config.setSearchableFields(searchableFields);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("abc", "xyz");
@@ -454,7 +468,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
         
config.setSearchableAttributes(SearchableFieldParser.extractSearchableFields("immense",
 false));
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         int immenseAttrSize = 33000; // must be greater than 32766 for a 
meaningful test
         StringBuilder immenseBldr = new StringBuilder(immenseAttrSize);
@@ -498,7 +512,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -542,7 +556,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setCompressOnRollover(true);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -565,7 +579,7 @@ public class TestPersistentProvenanceRepository {
         }
 
         repo.waitForRollover();
-        final File storageDir = config.getStorageDirectories().get(0);
+        final File storageDir = 
config.getStorageDirectories().values().iterator().next();
         final File compressedLogFile = new File(storageDir, "0.prov.gz");
         assertTrue(compressedLogFile.exists());
     }
@@ -580,7 +594,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "10000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -653,8 +667,8 @@ public class TestPersistentProvenanceRepository {
                         final AtomicInteger indexSearcherCount = new 
AtomicInteger(0);
 
                         @Override
-                        public IndexSearcher borrowIndexSearcher(File 
indexDir) throws IOException {
-                            final IndexSearcher searcher = 
mgr.borrowIndexSearcher(indexDir);
+                        public EventIndexSearcher borrowIndexSearcher(File 
indexDir) throws IOException {
+                            final EventIndexSearcher searcher = 
mgr.borrowIndexSearcher(indexDir);
                             final int idx = 
indexSearcherCount.incrementAndGet();
                             obtainIndexSearcherLatch.countDown();
 
@@ -677,7 +691,7 @@ public class TestPersistentProvenanceRepository {
                         }
 
                         @Override
-                        public IndexWriter borrowIndexWriter(File 
indexingDirectory) throws IOException {
+                        public EventIndexWriter borrowIndexWriter(File 
indexingDirectory) throws IOException {
                             return mgr.borrowIndexWriter(indexingDirectory);
                         }
 
@@ -687,18 +701,19 @@ public class TestPersistentProvenanceRepository {
                         }
 
                         @Override
-                        public void removeIndex(File indexDirectory) {
+                        public boolean removeIndex(File indexDirectory) {
                             mgr.removeIndex(indexDirectory);
+                            return true;
                         }
 
                         @Override
-                        public void returnIndexSearcher(File indexDirectory, 
IndexSearcher searcher) {
-                            mgr.returnIndexSearcher(indexDirectory, searcher);
+                        public void returnIndexSearcher(EventIndexSearcher 
searcher) {
+                            mgr.returnIndexSearcher(searcher);
                         }
 
                         @Override
-                        public void returnIndexWriter(File indexingDirectory, 
IndexWriter writer) {
-                            mgr.returnIndexWriter(indexingDirectory, writer);
+                        public void returnIndexWriter(EventIndexWriter writer) 
{
+                            mgr.returnIndexWriter(writer);
                         }
                     };
                 }
@@ -707,7 +722,7 @@ public class TestPersistentProvenanceRepository {
             }
         };
 
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "10000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -790,7 +805,7 @@ public class TestPersistentProvenanceRepository {
     @Test
     public void 
testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws 
IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
-        config.addStorageDirectory(new File("target/storage/" + 
UUID.randomUUID().toString()));
+        config.addStorageDirectory("2", new File("target/storage/" + 
UUID.randomUUID().toString()));
         config.setMaxRecordLife(30, TimeUnit.SECONDS);
         config.setMaxStorageCapacity(1024L * 1024L);
         config.setMaxEventFileLife(1, TimeUnit.SECONDS);
@@ -798,7 +813,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -885,7 +900,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -941,7 +956,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000001";
         final Map<String, String> attributes = new HashMap<>();
@@ -996,7 +1011,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000001";
         final Map<String, String> attributes = new HashMap<>();
@@ -1055,7 +1070,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String childId = "00000000-0000-0000-0000-000000000000";
 
@@ -1105,7 +1120,7 @@ public class TestPersistentProvenanceRepository {
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String childId = "00000000-0000-0000-0000-000000000000";
 
@@ -1152,7 +1167,7 @@ public class TestPersistentProvenanceRepository {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(1, TimeUnit.SECONDS);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1178,7 +1193,7 @@ public class TestPersistentProvenanceRepository {
         repo.close();
 
         final PersistentProvenanceRepository secondRepo = new 
PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        secondRepo.initialize(getEventReporter(), null, null);
+        secondRepo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         try {
             final ProvenanceEventRecord event11 = builder.build();
@@ -1239,7 +1254,7 @@ public class TestPersistentProvenanceRepository {
         config.setDesiredIndexSize(10);
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         String uuid = UUID.randomUUID().toString();
         for (int i = 0; i < 20; i++) {
@@ -1253,7 +1268,7 @@ public class TestPersistentProvenanceRepository {
             }
         }
         repo.waitForRollover();
-        File eventFile = new File(config.getStorageDirectories().get(0), 
"10.prov.gz");
+        File eventFile = new 
File(config.getStorageDirectories().values().iterator().next(), "10.prov.gz");
         assertTrue(eventFile.delete());
         return eventFile;
     }
@@ -1270,7 +1285,7 @@ public class TestPersistentProvenanceRepository {
         config.setDesiredIndexSize(10); // force new index to be created for 
each rollover
 
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1298,7 +1313,7 @@ public class TestPersistentProvenanceRepository {
         Thread.sleep(2000L);
 
         final FileFilter indexFileFilter = file -> 
file.getName().startsWith("index");
-        final int numIndexDirs = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter).length;
+        final int numIndexDirs = 
config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter).length;
         assertEquals(1, numIndexDirs);
 
         // add more records so that we will create a new index
@@ -1324,7 +1339,7 @@ public class TestPersistentProvenanceRepository {
         assertEquals(20, result.getMatchingEvents().size());
 
         // Ensure index directories exists
-        File[] indexDirs = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter);
+        File[] indexDirs = 
config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter);
         assertEquals(2, indexDirs.length);
 
         // expire old events and indexes
@@ -1337,7 +1352,7 @@ public class TestPersistentProvenanceRepository {
         assertEquals(10, newRecordSet.getMatchingEvents().size());
 
         // Ensure that one index directory is gone
-        indexDirs = 
config.getStorageDirectories().get(0).listFiles(indexFileFilter);
+        indexDirs = 
config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter);
         assertEquals(1, indexDirs.length);
     }
 
@@ -1354,12 +1369,12 @@ public class TestPersistentProvenanceRepository {
         final AccessDeniedException expectedException = new 
AccessDeniedException("Unit Test - Intentionally Thrown");
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
             @Override
-            protected void authorize(ProvenanceEventRecord event, NiFiUser 
user) {
+            public void authorize(ProvenanceEventRecord event, NiFiUser user) {
                 throw expectedException;
             }
         };
 
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1409,7 +1424,7 @@ public class TestPersistentProvenanceRepository {
             }
         };
 
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1461,7 +1476,7 @@ public class TestPersistentProvenanceRepository {
             }
         };
 
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1522,7 +1537,7 @@ public class TestPersistentProvenanceRepository {
             }
         };
 
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1641,7 +1656,7 @@ public class TestPersistentProvenanceRepository {
                 return journalCountRef.get();
             }
         };
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
         final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
@@ -1697,7 +1712,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setSearchableFields(new 
ArrayList<>(SearchableFields.getStandardFields()));
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String uuid = "00000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
@@ -1732,7 +1747,7 @@ public class TestPersistentProvenanceRepository {
         final List<File> indexDirs = indexConfig.getIndexDirectories();
 
         final String query = "uuid:00000000-0000-0000-0000-0000000000* AND NOT 
filename:file-?";
-        final List<Document> results = runQuery(indexDirs.get(0), 
config.getStorageDirectories(), query);
+        final List<Document> results = runQuery(indexDirs.get(0), new 
ArrayList<>(config.getStorageDirectories().values()), query);
 
         assertEquals(6, results.size());
     }
@@ -1786,7 +1801,7 @@ public class TestPersistentProvenanceRepository {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
 
@@ -1813,7 +1828,7 @@ public class TestPersistentProvenanceRepository {
 
         repo.waitForRollover();
 
-        final File storageDir = config.getStorageDirectories().get(0);
+        final File storageDir = 
config.getStorageDirectories().values().iterator().next();
         long counter = 0;
         for (final File file : storageDir.listFiles()) {
             if (file.isFile()) {
@@ -1853,7 +1868,7 @@ public class TestPersistentProvenanceRepository {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         TestablePersistentProvenanceRepository testRepo = new 
TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        testRepo.initialize(getEventReporter(), null, null);
+        testRepo.initialize(getEventReporter(), null, null, null);
 
         final Map<String, String> attributes = new HashMap<>();
 
@@ -1897,7 +1912,7 @@ public class TestPersistentProvenanceRepository {
                         + "that the record wasn't completely written to the 
file. This journal will be skipped.",
                 reportedEvents.get(reportedEvents.size() - 1).getMessage());
 
-        final File storageDir = config.getStorageDirectories().get(0);
+        final File storageDir = 
config.getStorageDirectories().values().iterator().next();
         assertTrue(checkJournalRecords(storageDir, false) < 10000);
     }
 
@@ -1906,7 +1921,7 @@ public class TestPersistentProvenanceRepository {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         TestablePersistentProvenanceRepository testRepo = new 
TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        testRepo.initialize(getEventReporter(), null, null);
+        testRepo.initialize(getEventReporter(), null, null, null);
 
         final Map<String, String> attributes = new HashMap<>();
 
@@ -1951,7 +1966,7 @@ public class TestPersistentProvenanceRepository {
                         + "be skipped.",
                 reportedEvents.get(reportedEvents.size() - 1).getMessage());
 
-        final File storageDir = config.getStorageDirectories().get(0);
+        final File storageDir = 
config.getStorageDirectories().values().iterator().next();
         assertTrue(checkJournalRecords(storageDir, false) < 10000);
     }
 
@@ -1960,7 +1975,7 @@ public class TestPersistentProvenanceRepository {
         final RepositoryConfiguration config = createConfiguration();
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         TestablePersistentProvenanceRepository testRepo = new 
TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
-        testRepo.initialize(getEventReporter(), null, null);
+        testRepo.initialize(getEventReporter(), null, null, null);
 
         final Map<String, String> attributes = new HashMap<>();
 
@@ -1997,7 +2012,7 @@ public class TestPersistentProvenanceRepository {
 
         assertEquals("mergeJournals() should not error on empty journal", 0, 
reportedEvents.size());
 
-        final File storageDir = config.getStorageDirectories().get(0);
+        final File storageDir = 
config.getStorageDirectories().values().iterator().next();
         assertEquals(config.getJournalCount() - 1, 
checkJournalRecords(storageDir, true));
     }
 
@@ -2025,7 +2040,7 @@ public class TestPersistentProvenanceRepository {
                 return 10L; // retry quickly.
             }
         };
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
 
@@ -2062,7 +2077,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxAttributeChars(50);
         config.setMaxEventFileLife(3, TimeUnit.SECONDS);
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS);
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final String maxLengthChars = 
"12345678901234567890123456789012345678901234567890";
         final Map<String, String> attributes = new HashMap<>();
@@ -2108,7 +2123,7 @@ public class TestPersistentProvenanceRepository {
         repo = new PersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
             @Override
             protected synchronized IndexingAction createIndexingAction() {
-                return new IndexingAction(repo) {
+                return new IndexingAction(config.getSearchableFields(), 
config.getSearchableAttributes()) {
                     @Override
                     public void index(StandardProvenanceEventRecord record, 
IndexWriter indexWriter, Integer blockIndex) throws IOException {
                         final int count = indexedEventCount.incrementAndGet();
@@ -2121,7 +2136,7 @@ public class TestPersistentProvenanceRepository {
                 };
             }
         };
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
@@ -2169,7 +2184,7 @@ public class TestPersistentProvenanceRepository {
         };
 
         // initialize with our event reporter
-        repo.initialize(getEventReporter(), null, null);
+        repo.initialize(getEventReporter(), null, null, 
IdentifierLookup.EMPTY);
 
         // create some events in the journal files.
         final Map<String, String> attributes = new HashMap<>();
@@ -2219,10 +2234,12 @@ public class TestPersistentProvenanceRepository {
             this.message = message;
         }
 
+        @SuppressWarnings("unused")
         public String getCategory() {
             return category;
         }
 
+        @SuppressWarnings("unused")
         public String getMessage() {
             return message;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
index aed690b..2eb353e 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -34,11 +35,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
+import org.apache.nifi.provenance.schema.EventFieldNames;
 import org.apache.nifi.provenance.schema.EventRecord;
-import org.apache.nifi.provenance.schema.EventRecordFields;
 import org.apache.nifi.provenance.schema.ProvenanceEventSchema;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordWriter;
@@ -55,14 +58,14 @@ import org.apache.nifi.repository.schema.RecordField;
 import org.apache.nifi.repository.schema.RecordSchema;
 import org.apache.nifi.repository.schema.Repetition;
 import org.apache.nifi.repository.schema.SimpleRecordField;
-import org.apache.nifi.stream.io.DataOutputStream;
 import org.apache.nifi.stream.io.NullOutputStream;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter {
-
+    private final AtomicLong idGenerator = new AtomicLong(0L);
     private File journalFile;
     private File tocFile;
 
@@ -70,6 +73,58 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
     public void setup() {
         journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testFieldAddedToSchema");
         tocFile = TocUtil.getTocFile(journalFile);
+        idGenerator.set(0L);
+    }
+
+
+    @Test
+    @Ignore("runs forever for performance analysis/profiling")
+    public void testPerformanceOfRandomAccessReads() throws Exception {
+        journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testPerformanceOfRandomAccessReads.gz");
+        tocFile = TocUtil.getTocFile(journalFile);
+
+        try (final RecordWriter writer = createWriter(journalFile, new 
StandardTocWriter(tocFile, true, false), true, 1024 * 32)) {
+            writer.writeHeader(0L);
+
+            for (int i = 0; i < 100_000; i++) {
+                writer.writeRecord(createEvent());
+            }
+        }
+
+        final long[] eventIds = new long[] {
+            4, 80, 1024, 1025, 1026, 1027, 1028, 1029, 1030, 40_000, 80_000, 
99_000
+        };
+
+        boolean loopForever = true;
+        while (loopForever) {
+            final long start = System.nanoTime();
+            for (int i = 0; i < 1000; i++) {
+                try (final InputStream in = new FileInputStream(journalFile);
+                    final RecordReader reader = createReader(in, 
journalFile.getName(), new StandardTocReader(tocFile), 32 * 1024)) {
+
+                    for (final long id : eventIds) {
+                        time(() -> {
+                            reader.skipToEvent(id);
+                            return reader.nextRecord();
+                        }, id);
+                    }
+                }
+            }
+
+            final long ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+            System.out.println(ms + " ms total");
+        }
+    }
+
+    private void time(final Callable<StandardProvenanceEventRecord> task, 
final long id) throws Exception {
+        final long start = System.nanoTime();
+        final StandardProvenanceEventRecord event = task.call();
+        Assert.assertNotNull(event);
+        Assert.assertEquals(id, event.getEventId());
+        //        System.out.println(event);
+        final long nanos = System.nanoTime() - start;
+        final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+        //        System.out.println("Took " + millis + " ms to " + 
taskDescription);
     }
 
 
@@ -83,8 +138,8 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
 
         try (final ByteArraySchemaRecordWriter writer = 
createSchemaWriter(schemaModifier, toAdd)) {
             writer.writeHeader(1L);
-            writer.writeRecord(createEvent(), 3L);
-            writer.writeRecord(createEvent(), 3L);
+            writer.writeRecord(createEvent());
+            writer.writeRecord(createEvent());
         }
 
         try (final InputStream in = new FileInputStream(journalFile);
@@ -94,7 +149,6 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
             for (int i = 0; i < 2; i++) {
                 final StandardProvenanceEventRecord event = 
reader.nextRecord();
                 assertNotNull(event);
-                assertEquals(3L, event.getEventId());
                 assertEquals("1234", event.getComponentId());
                 assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
 
@@ -111,14 +165,14 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
             // Create a schema that has the fields modified
             final RecordSchema schemaV1 = 
ProvenanceEventSchema.PROVENANCE_EVENT_SCHEMA_V1;
             final List<RecordField> fields = new 
ArrayList<>(schemaV1.getFields());
-            fields.remove(new 
SimpleRecordField(EventRecordFields.Names.UPDATED_ATTRIBUTES, FieldType.STRING, 
Repetition.EXACTLY_ONE));
-            fields.remove(new 
SimpleRecordField(EventRecordFields.Names.PREVIOUS_ATTRIBUTES, 
FieldType.STRING, Repetition.EXACTLY_ONE));
+            fields.remove(new 
SimpleRecordField(EventFieldNames.UPDATED_ATTRIBUTES, FieldType.STRING, 
Repetition.EXACTLY_ONE));
+            fields.remove(new 
SimpleRecordField(EventFieldNames.PREVIOUS_ATTRIBUTES, FieldType.STRING, 
Repetition.EXACTLY_ONE));
             final RecordSchema recordSchema = new RecordSchema(fields);
 
             // Create a record writer whose schema does not contain updated 
attributes or previous attributes.
             // This means that we must also override the method that writes 
out attributes so that we are able
             // to avoid actually writing them out.
-            final ByteArraySchemaRecordWriter writer = new 
ByteArraySchemaRecordWriter(journalFile, tocWriter, false, 0) {
+            final ByteArraySchemaRecordWriter writer = new 
ByteArraySchemaRecordWriter(journalFile, idGenerator, tocWriter, false, 0) {
                 @Override
                 public void writeHeader(long firstEventId, DataOutputStream 
out) throws IOException {
                     final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
@@ -130,15 +184,15 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
 
                 @Override
                 protected Record createRecord(final ProvenanceEventRecord 
event, final long eventId) {
-                    final RecordSchema contentClaimSchema = new 
RecordSchema(recordSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields());
+                    final RecordSchema contentClaimSchema = new 
RecordSchema(recordSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields());
                     return new EventRecord(event, eventId, recordSchema, 
contentClaimSchema);
                 }
             };
 
             try {
                 writer.writeHeader(1L);
-                writer.writeRecord(createEvent(), 3L);
-                writer.writeRecord(createEvent(), 3L);
+                writer.writeRecord(createEvent());
+                writer.writeRecord(createEvent());
             } finally {
                 writer.close();
             }
@@ -154,7 +208,6 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
             for (int i = 0; i < 2; i++) {
                 final StandardProvenanceEventRecord event = 
reader.nextRecord();
                 assertNotNull(event);
-                assertEquals(3L, event.getEventId());
                 assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
 
                 // We will still have a Map<String, String> for updated 
attributes because the
@@ -175,7 +228,7 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
 
         try (final ByteArraySchemaRecordWriter writer = 
createSchemaWriter(schemaModifier, toAdd)) {
             writer.writeHeader(1L);
-            writer.writeRecord(createEvent(), 3L);
+            writer.writeRecord(createEvent());
         }
 
         try (final InputStream in = new FileInputStream(journalFile);
@@ -207,9 +260,9 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
         fieldModifier.accept(fields);
 
         final RecordSchema recordSchema = new RecordSchema(fields);
-        final RecordSchema contentClaimSchema = new 
RecordSchema(recordSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields());
+        final RecordSchema contentClaimSchema = new 
RecordSchema(recordSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields());
 
-        final ByteArraySchemaRecordWriter writer = new 
ByteArraySchemaRecordWriter(journalFile, tocWriter, false, 0) {
+        final ByteArraySchemaRecordWriter writer = new 
ByteArraySchemaRecordWriter(journalFile, idGenerator, tocWriter, false, 0) {
             @Override
             public void writeHeader(long firstEventId, DataOutputStream out) 
throws IOException {
                 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -250,14 +303,13 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
         final int numEvents = 10_000_000;
         final long startNanos = System.nanoTime();
         try (final OutputStream nullOut = new NullOutputStream();
-            final RecordWriter writer = new 
ByteArraySchemaRecordWriter(nullOut, tocWriter, false, 0)) {
+            final RecordWriter writer = new 
ByteArraySchemaRecordWriter(nullOut, "out", idGenerator, tocWriter, false, 0)) {
 
             writer.writeHeader(0L);
 
             for (int i = 0; i < numEvents; i++) {
-                writer.writeRecord(event, i);
+                writer.writeRecord(event);
             }
-
         }
 
         final long nanos = System.nanoTime() - startNanos;
@@ -280,7 +332,7 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
         try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
             final DataOutputStream out = new DataOutputStream(headerOut)) {
 
-            final RecordWriter schemaWriter = new 
ByteArraySchemaRecordWriter(out, null, false, 0);
+            final RecordWriter schemaWriter = new 
ByteArraySchemaRecordWriter(out, "out", idGenerator, null, false, 0);
             schemaWriter.writeHeader(1L);
 
             header = headerOut.toByteArray();
@@ -288,12 +340,12 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
 
         final byte[] serializedRecord;
         try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
-            final RecordWriter writer = new 
ByteArraySchemaRecordWriter(headerOut, null, false, 0)) {
+            final RecordWriter writer = new 
ByteArraySchemaRecordWriter(headerOut, "out", idGenerator, null, false, 0)) {
 
             writer.writeHeader(1L);
             headerOut.reset();
 
-            writer.writeRecord(event, 1L);
+            writer.writeRecord(event);
             writer.flush();
             serializedRecord = headerOut.toByteArray();
         }
@@ -322,7 +374,7 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
 
     @Override
     protected RecordWriter createWriter(File file, TocWriter tocWriter, 
boolean compressed, int uncompressedBlockSize) throws IOException {
-        return new ByteArraySchemaRecordWriter(file, tocWriter, compressed, 
uncompressedBlockSize);
+        return new ByteArraySchemaRecordWriter(file, idGenerator, tocWriter, 
compressed, uncompressedBlockSize);
     }
 
 
@@ -331,11 +383,4 @@ public class TestSchemaRecordReaderWriter extends 
AbstractTestRecordReaderWriter
         final ByteArraySchemaRecordReader reader = new 
ByteArraySchemaRecordReader(in, journalFilename, tocReader, maxAttributeSize);
         return reader;
     }
-
-    private static interface WriteRecordInterceptor {
-        void writeRawRecord(ProvenanceEventRecord event, long 
recordIdentifier, DataOutputStream out) throws IOException;
-    }
-
-    private static WriteRecordInterceptor NOP_INTERCEPTOR = (event, id, out) 
-> {};
-    private static WriteRecordInterceptor WRITE_DUMMY_STRING_INTERCEPTOR = 
(event, id, out) -> out.writeUTF("hello");
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index dfa37e4..27002c8 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -16,7 +16,11 @@
  */
 package org.apache.nifi.provenance;
 
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertTrue;
+
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -25,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.provenance.serialization.RecordReader;
@@ -32,16 +37,18 @@ import 
org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.toc.NopTocWriter;
 import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.stream.io.DataOutputStream;
 import org.apache.nifi.stream.io.NullOutputStream;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.apache.nifi.provenance.TestUtil.createFlowFile;
-import static org.junit.Assert.assertTrue;
-
 public class TestStandardRecordReaderWriter extends 
AbstractTestRecordReaderWriter {
+    private AtomicLong idGenerator = new AtomicLong(0L);
 
+    @Before
+    public void resetIds() {
+        idGenerator.set(0L);
+    }
 
     @Test
     @Ignore("For local testing only")
@@ -56,12 +63,12 @@ public class TestStandardRecordReaderWriter extends 
AbstractTestRecordReaderWrit
         final int numEvents = 10_000_000;
         final long startNanos = System.nanoTime();
         try (final OutputStream nullOut = new NullOutputStream();
-            final RecordWriter writer = new StandardRecordWriter(nullOut, 
tocWriter, false, 100000)) {
+            final RecordWriter writer = new StandardRecordWriter(nullOut, 
"devnull", idGenerator, tocWriter, false, 100000)) {
 
             writer.writeHeader(0L);
 
             for (int i = 0; i < numEvents; i++) {
-                writer.writeRecord(event, i);
+                writer.writeRecord(event);
             }
         }
 
@@ -90,12 +97,12 @@ public class TestStandardRecordReaderWriter extends 
AbstractTestRecordReaderWrit
 
         final byte[] serializedRecord;
         try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
-            final StandardRecordWriter writer = new 
StandardRecordWriter(headerOut, null, false, 0)) {
+            final StandardRecordWriter writer = new 
StandardRecordWriter(headerOut, "devnull", idGenerator, null, false, 0)) {
 
             writer.writeHeader(1L);
             headerOut.reset();
 
-            writer.writeRecord(event, 1L);
+            writer.writeRecord(event);
             writer.flush();
             serializedRecord = headerOut.toByteArray();
         }
@@ -142,18 +149,18 @@ public class TestStandardRecordReaderWriter extends 
AbstractTestRecordReaderWrit
         }
 
         try (final ByteArrayOutputStream recordOut = new 
ByteArrayOutputStream();
-            final StandardRecordWriter writer = new 
StandardRecordWriter(recordOut, null, false, 0)) {
+            final StandardRecordWriter writer = new 
StandardRecordWriter(recordOut, "devnull", idGenerator, null, false, 0)) {
 
             writer.writeHeader(1L);
             recordOut.reset();
 
-            writer.writeRecord(record, 1L);
+            writer.writeRecord(record);
         }
     }
 
     @Override
     protected RecordWriter createWriter(File file, TocWriter tocWriter, 
boolean compressed, int uncompressedBlockSize) throws IOException {
-        return new StandardRecordWriter(file, tocWriter, compressed, 
uncompressedBlockSize);
+        return new StandardRecordWriter(file, idGenerator, tocWriter, 
compressed, uncompressedBlockSize);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
index 514a43e..224ee71 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.nifi.flowfile.FlowFile;
 
@@ -82,4 +83,22 @@ public class TestUtil {
             }
         };
     }
+
+    public static ProvenanceEventRecord createEvent() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "1.txt");
+        attributes.put("uuid", UUID.randomUUID().toString());
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final ProvenanceEventRecord record = builder.build();
+
+        return record;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestEventIndexTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestEventIndexTask.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestEventIndexTask.java
new file mode 100644
index 0000000..4c58b13
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestEventIndexTask.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.LongField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.index.EventIndexWriter;
+import org.apache.nifi.provenance.lucene.IndexManager;
+import org.apache.nifi.provenance.lucene.LuceneEventIndexWriter;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestEventIndexTask {
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", 
"DEBUG");
+    }
+
+    @Test(timeout = 5000)
+    public void testIndexWriterCommittedWhenAppropriate() throws IOException, 
InterruptedException {
+        final BlockingQueue<StoredDocument> docQueue = new 
LinkedBlockingQueue<>();
+        final RepositoryConfiguration repoConfig = new 
RepositoryConfiguration();
+        final File storageDir = new 
File("target/storage/TestEventIndexTask/1");
+        repoConfig.addStorageDirectory("1", storageDir);
+
+        final AtomicInteger commitCount = new AtomicInteger(0);
+
+        // Mock out an IndexWriter and keep track of the number of events that 
are indexed.
+        final IndexWriter indexWriter = Mockito.mock(IndexWriter.class);
+        final EventIndexWriter eventIndexWriter = new 
LuceneEventIndexWriter(indexWriter, storageDir);
+
+        final IndexManager indexManager = Mockito.mock(IndexManager.class);
+        
Mockito.when(indexManager.borrowIndexWriter(Mockito.any(File.class))).thenReturn(eventIndexWriter);
+
+        final IndexDirectoryManager directoryManager = new 
IndexDirectoryManager(repoConfig);
+
+        // Create an EventIndexTask and override the commit(IndexWriter) 
method so that we can keep track of how
+        // many times the index writer gets committed.
+        final EventIndexTask task = new EventIndexTask(docQueue, repoConfig, 
indexManager, directoryManager, 201, EventReporter.NO_OP) {
+            @Override
+            protected void commit(EventIndexWriter indexWriter) throws 
IOException {
+                commitCount.incrementAndGet();
+            }
+        };
+
+        // Create 4 threads, each one a daemon thread running the 
EventIndexTask
+        for (int i = 0; i < 4; i++) {
+            final Thread t = new Thread(task);
+            t.setDaemon(true);
+            t.start();
+        }
+
+        assertEquals(0, commitCount.get());
+
+        // Index 100 documents with a storage filename of "0.0.prov"
+        for (int i = 0; i < 100; i++) {
+            final Document document = new Document();
+            document.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
System.currentTimeMillis(), Store.NO));
+
+            final StorageSummary location = new StorageSummary(1L, "0.0.prov", 
"1", 0, 1000L, 1000L);
+            final StoredDocument storedDoc = new StoredDocument(document, 
location);
+            docQueue.add(storedDoc);
+        }
+        assertEquals(0, commitCount.get());
+
+        // Index 100 documents
+        for (int i = 0; i < 100; i++) {
+            final Document document = new Document();
+            document.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
System.currentTimeMillis(), Store.NO));
+
+            final StorageSummary location = new StorageSummary(1L, "0.0.prov", 
"1", 0, 1000L, 1000L);
+            final StoredDocument storedDoc = new StoredDocument(document, 
location);
+            docQueue.add(storedDoc);
+        }
+
+        // Wait until we've indexed all 200 events
+        while (eventIndexWriter.getEventsIndexed() < 200) {
+            Thread.sleep(10L);
+        }
+
+        // Wait a bit and make sure that we still haven't committed the index 
writer.
+        Thread.sleep(100L);
+        assertEquals(0, commitCount.get());
+
+        // Add another document.
+        final Document document = new Document();
+        document.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
System.currentTimeMillis(), Store.NO));
+        final StorageSummary location = new StorageSummary(1L, "0.0.prov", 
"1", 0, 1000L, 1000L);
+
+        StoredDocument storedDoc = new StoredDocument(document, location);
+        docQueue.add(storedDoc);
+
+        // Wait until index writer is committed.
+        while (commitCount.get() == 0) {
+            Thread.sleep(10L);
+        }
+        assertEquals(1, commitCount.get());
+
+        // Add a new IndexableDocument with a count of 1 to ensure that the 
writer is committed again.
+        storedDoc = new StoredDocument(document, location);
+        docQueue.add(storedDoc);
+        Thread.sleep(100L);
+        assertEquals(1, commitCount.get());
+
+        // Add a new IndexableDocument with a count of 3. Index writer should 
not be committed again.
+        storedDoc = new StoredDocument(document, location);
+        docQueue.add(storedDoc);
+        Thread.sleep(100L);
+        assertEquals(1, commitCount.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
new file mode 100644
index 0000000..3f3c422
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.junit.Test;
+
+public class TestIndexDirectoryManager {
+
+    @Test
+    public void testGetDirectoriesIncludesMatchingTimestampPlusOne() {
+        final List<IndexLocation> locations = new ArrayList<>();
+        locations.add(createLocation(999L));
+        locations.add(createLocation(1002L));
+        locations.add(createLocation(1005L));
+
+        final List<File> directories = 
IndexDirectoryManager.getDirectories(1000L, 1001L, locations);
+        assertEquals(2, directories.size());
+        assertTrue(directories.contains(new File("index-999")));
+        assertTrue(directories.contains(new File("index-1002")));
+    }
+
+    @Test
+    public void testGetDirectoriesOnlyObtainsDirectoriesForDesiredPartition() {
+        final RepositoryConfiguration config = createConfig(2);
+
+        final File storageDir1 = config.getStorageDirectories().get("1");
+        final File storageDir2 = config.getStorageDirectories().get("2");
+
+        final File index1 = new File(storageDir1, "index-1");
+        final File index2 = new File(storageDir1, "index-2");
+        final File index3 = new File(storageDir2, "index-3");
+        final File index4 = new File(storageDir2, "index-4");
+
+        final File[] allIndices = new File[] {index1, index2, index3, index4};
+        for (final File file : allIndices) {
+            assertTrue(file.mkdirs() || file.exists());
+        }
+
+        try {
+            final IndexDirectoryManager mgr = new 
IndexDirectoryManager(config);
+            mgr.initialize();
+
+            final List<File> indexes1 = mgr.getDirectories(0L, Long.MAX_VALUE, 
"1");
+            final List<File> indexes2 = mgr.getDirectories(0L, Long.MAX_VALUE, 
"2");
+
+            assertEquals(2, indexes1.size());
+            assertTrue(indexes1.contains(index1));
+            assertTrue(indexes1.contains(index2));
+
+            assertEquals(2, indexes2.size());
+            assertTrue(indexes2.contains(index3));
+            assertTrue(indexes2.contains(index4));
+        } finally {
+            for (final File file : allIndices) {
+                file.delete();
+            }
+        }
+    }
+
+
+    private IndexLocation createLocation(final long timestamp) {
+        return createLocation(timestamp, "1");
+    }
+
+    private IndexLocation createLocation(final long timestamp, final String 
partitionName) {
+        return new IndexLocation(new File("index-" + timestamp), timestamp, 
partitionName, 1024 * 1024L);
+    }
+
+    private RepositoryConfiguration createConfig(final int partitions) {
+        final RepositoryConfiguration repoConfig = new 
RepositoryConfiguration();
+        for (int i = 1; i <= partitions; i++) {
+            repoConfig.addStorageDirectory(String.valueOf(i), new 
File("target/storage/testIndexDirectoryManager/" + UUID.randomUUID() + "/" + 
i));
+        }
+        return repoConfig;
+    }
+}

Reply via email to