http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocumentToEventConverter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocumentToEventConverter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocumentToEventConverter.java
new file mode 100644
index 0000000..18d3860
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocumentToEventConverter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.TopDocs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface DocumentToEventConverter {
+
+    Set<ProvenanceEventRecord> convert(TopDocs topDocs, IndexReader 
indexReader) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index f84021f..331d141 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -21,17 +21,19 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.search.IndexSearcher;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.index.EventIndexWriter;
 
 public interface IndexManager extends Closeable {
-    IndexSearcher borrowIndexSearcher(File indexDir) throws IOException;
+    EventIndexSearcher borrowIndexSearcher(File indexDir) throws IOException;
 
-    IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException;
+    EventIndexWriter borrowIndexWriter(File indexDirectory) throws IOException;
 
-    void removeIndex(final File indexDirectory);
+    boolean removeIndex(final File indexDirectory);
 
-    void returnIndexSearcher(File indexDirectory, IndexSearcher searcher);
+    void returnIndexSearcher(EventIndexSearcher searcher);
 
-    void returnIndexWriter(File indexingDirectory, IndexWriter writer);
+    void returnIndexWriter(EventIndexWriter writer, boolean commit, boolean 
isCloseable);
+
+    void returnIndexWriter(EventIndexWriter writer);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index 8d7df8b..514af38 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -21,18 +21,20 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TopDocs;
+import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.provenance.PersistentProvenanceRepository;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.StandardQueryResult;
-import org.apache.nifi.provenance.authorization.AuthorizationCheck;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,14 +88,14 @@ public class IndexSearch {
         final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
 
         final long start = System.nanoTime();
-        IndexSearcher searcher = null;
+        EventIndexSearcher searcher = null;
         try {
             searcher = indexManager.borrowIndexSearcher(indexDirectory);
             final long searchStartNanos = System.nanoTime();
             final long openSearcherNanos = searchStartNanos - start;
 
             logger.debug("Searching {} for {}", this, provenanceQuery);
-            final TopDocs topDocs = searcher.search(luceneQuery, 
provenanceQuery.getMaxResults());
+            final TopDocs topDocs = 
searcher.getIndexSearcher().search(luceneQuery, 
provenanceQuery.getMaxResults());
             final long finishSearch = System.nanoTime();
             final long searchNanos = finishSearch - searchStartNanos;
 
@@ -107,9 +109,29 @@ public class IndexSearch {
 
             final DocsReader docsReader = new DocsReader();
 
-            final AuthorizationCheck authCheck = event -> 
repository.isAuthorized(event, user);
-
-            matchingRecords = docsReader.read(topDocs, authCheck, 
searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
+            final EventAuthorizer authorizer = new EventAuthorizer() {
+                @Override
+                public boolean isAuthorized(ProvenanceEventRecord event) {
+                    return repository.isAuthorized(event, user);
+                }
+
+                @Override
+                public void authorize(ProvenanceEventRecord event) throws 
AccessDeniedException {
+                    repository.authorize(event, user);
+                }
+
+                @Override
+                public List<ProvenanceEventRecord> 
filterUnauthorizedEvents(List<ProvenanceEventRecord> events) {
+                    return repository.filterUnauthorizedEvents(events, user);
+                }
+
+                @Override
+                public Set<ProvenanceEventRecord> 
replaceUnauthorizedWithPlaceholders(Set<ProvenanceEventRecord> events) {
+                    return 
repository.replaceUnauthorizedWithPlaceholders(events, user);
+                }
+            };
+
+            matchingRecords = docsReader.read(topDocs, authorizer, 
searcher.getIndexSearcher().getIndexReader(), repository.getAllLogFiles(), 
retrievedCount,
                 provenanceQuery.getMaxResults(), maxAttributeChars);
 
             final long readRecordsNanos = System.nanoTime() - finishSearch;
@@ -133,7 +155,7 @@ public class IndexSearch {
             return sqr;
         } finally {
             if ( searcher != null ) {
-                indexManager.returnIndexSearcher(indexDirectory, searcher);
+                indexManager.returnIndexSearcher(searcher);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index f725208..a0be319 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -19,6 +19,7 @@ package org.apache.nifi.provenance.lucene;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.lucene.document.Document;
@@ -28,23 +29,22 @@ import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.search.SearchableField;
 
 public class IndexingAction {
-    private final Set<SearchableField> nonAttributeSearchableFields;
-    private final Set<SearchableField> attributeSearchableFields;
+    private final Set<SearchableField> searchableEventFields;
+    private final Set<SearchableField> searchableAttributeFields;
 
-    public IndexingAction(final PersistentProvenanceRepository repo) {
-        attributeSearchableFields = Collections.unmodifiableSet(new 
HashSet<>(repo.getConfiguration().getSearchableAttributes()));
-        nonAttributeSearchableFields = Collections.unmodifiableSet(new 
HashSet<>(repo.getConfiguration().getSearchableFields()));
+    public IndexingAction(final List<SearchableField> searchableEventFields, 
final List<SearchableField> searchableAttributes) {
+        this.searchableEventFields = Collections.unmodifiableSet(new 
HashSet<>(searchableEventFields));
+        this.searchableAttributeFields = Collections.unmodifiableSet(new 
HashSet<>(searchableAttributes));
     }
 
     private void addField(final Document doc, final SearchableField field, 
final String value, final Store store) {
-        if (value == null || (!field.isAttribute() && 
!nonAttributeSearchableFields.contains(field))) {
+        if (value == null || (!field.isAttribute() && 
!searchableEventFields.contains(field))) {
             return;
         }
 
@@ -67,7 +67,7 @@ public class IndexingAction {
         addField(doc, SearchableFields.SourceQueueIdentifier, 
record.getSourceQueueIdentifier(), Store.NO);
         addField(doc, SearchableFields.TransitURI, record.getTransitUri(), 
Store.NO);
 
-        for (final SearchableField searchableField : 
attributeSearchableFields) {
+        for (final SearchableField searchableField : 
searchableAttributeFields) {
             addField(doc, searchableField, 
LuceneUtil.truncateIndexField(record.getAttribute(searchableField.getSearchableFieldName())),
 Store.NO);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index 1b13504..2388483 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -25,18 +25,15 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause.Occur;
 import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.SearchableFields;
-import org.apache.nifi.provenance.authorization.AuthorizationCheck;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +43,8 @@ public class LineageQuery {
     public static final int MAX_LINEAGE_UUIDS = 100;
     private static final Logger logger = 
LoggerFactory.getLogger(LineageQuery.class);
 
-    public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final 
PersistentProvenanceRepository repo, final IndexManager indexManager, final 
File indexDirectory,
-            final String lineageIdentifier, final Collection<String> 
flowFileUuids, final int maxAttributeChars) throws IOException {
+    public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final 
IndexManager indexManager, final File indexDirectory,
+        final String lineageIdentifier, final Collection<String> 
flowFileUuids, final DocumentToEventConverter docsToEventConverter) throws 
IOException {
         if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
             throw new IllegalArgumentException(String.format("Cannot compute 
lineage for more than %s FlowFiles. This lineage contains %s.", 
MAX_LINEAGE_UUIDS, flowFileUuids.size()));
         }
@@ -56,7 +53,7 @@ public class LineageQuery {
             throw new IllegalArgumentException("Must specify either Lineage 
Identifier or FlowFile UUIDs to compute lineage");
         }
 
-        final IndexSearcher searcher;
+        final EventIndexSearcher searcher;
         try {
             searcher = indexManager.borrowIndexSearcher(indexDirectory);
             try {
@@ -75,16 +72,10 @@ public class LineageQuery {
 
                 final long searchStart = System.nanoTime();
                 logger.debug("Searching {} for {}", indexDirectory, 
flowFileIdQuery);
-                final TopDocs uuidQueryTopDocs = 
searcher.search(flowFileIdQuery, MAX_QUERY_RESULTS);
+                final TopDocs uuidQueryTopDocs = 
searcher.getIndexSearcher().search(flowFileIdQuery, MAX_QUERY_RESULTS);
                 final long searchEnd = System.nanoTime();
 
-                // Always authorized. We do this because we need to pull back 
the event, regardless of whether or not
-                // the user is truly authorized, because instead of ignoring 
unauthorized events, we want to replace them.
-                final AuthorizationCheck authCheck = event -> true;
-
-                final DocsReader docsReader = new DocsReader();
-                final Set<ProvenanceEventRecord> recs = 
docsReader.read(uuidQueryTopDocs, authCheck, searcher.getIndexReader(), 
repo.getAllLogFiles(),
-                        new AtomicInteger(0), Integer.MAX_VALUE, 
maxAttributeChars);
+                final Set<ProvenanceEventRecord> recs = 
docsToEventConverter.convert(uuidQueryTopDocs, 
searcher.getIndexSearcher().getIndexReader());
 
                 final long readDocsEnd = System.nanoTime();
                 logger.debug("Finished Lineage Query against {}; Lucene search 
took {} millis, reading records took {} millis",
@@ -92,7 +83,7 @@ public class LineageQuery {
 
                 return recs;
             } finally {
-                indexManager.returnIndexSearcher(indexDirectory, searcher);
+                indexManager.returnIndexSearcher(searcher);
             }
         } catch (final FileNotFoundException fnfe) {
             // nothing has been indexed yet, or the data has already aged off

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexSearcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexSearcher.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexSearcher.java
new file mode 100644
index 0000000..07b9167
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexSearcher.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LuceneEventIndexSearcher implements EventIndexSearcher {
+    private static final Logger logger = 
LoggerFactory.getLogger(LuceneEventIndexSearcher.class);
+
+    private final IndexSearcher indexSearcher;
+    private final File indexDirectory;
+    private final Directory directory;
+    private final DirectoryReader directoryReader;
+
+    // guarded by synchronizing on 'this'
+    private int usageCounter = 0;
+    private boolean closed = false;
+
+    public LuceneEventIndexSearcher(final IndexSearcher indexSearcher, final 
File indexDirectory, final Directory directory, final DirectoryReader 
directoryReader) {
+        this.indexSearcher = indexSearcher;
+        this.indexDirectory = indexDirectory;
+        this.directory = directory;
+        this.directoryReader = directoryReader;
+    }
+
+    @Override
+    public IndexSearcher getIndexSearcher() {
+        return indexSearcher;
+    }
+
+    @Override
+    public File getIndexDirectory() {
+        return indexDirectory;
+    }
+
+    @Override
+    public synchronized void close() {
+        closed = true;
+        if (usageCounter == 0) {
+            closeQuietly(directoryReader);
+            closeQuietly(directory);
+        }
+    }
+
+    public synchronized void incrementUsageCounter() {
+        usageCounter++;
+    }
+
+    public synchronized void decrementUsageCounter() {
+        usageCounter--;
+        if (usageCounter == 0 && closed) {
+            closeQuietly(directoryReader);
+            closeQuietly(directory);
+        }
+    }
+
+    private void closeQuietly(final Closeable closeable) {
+        if (closeable == null) {
+            return;
+        }
+
+        try {
+            closeable.close();
+        } catch (final Exception e) {
+            logger.warn("Failed to close {} due to {}", closeable, e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexWriter.java
new file mode 100644
index 0000000..db8c528
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneEventIndexWriter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.nifi.provenance.index.EventIndexWriter;
+
+public class LuceneEventIndexWriter implements EventIndexWriter {
+    private final IndexWriter indexWriter;
+    private final File directory;
+    private final long maxCommitNanos;
+
+    private final AtomicReference<CommitStats> commitStats = new 
AtomicReference<>();
+    private final AtomicLong totalIndexed = new AtomicLong(0L);
+    private final AtomicLong lastCommitTotalIndexed = new AtomicLong(0L);
+
+    public LuceneEventIndexWriter(final IndexWriter indexWriter, final File 
directory) {
+        this(indexWriter, directory, TimeUnit.SECONDS.toNanos(30L));
+    }
+
+    public LuceneEventIndexWriter(final IndexWriter indexWriter, final File 
directory, final long maxCommitNanos) {
+        this.indexWriter = indexWriter;
+        this.directory = directory;
+        this.maxCommitNanos = maxCommitNanos;
+
+        commitStats.set(new CommitStats(0, System.nanoTime() + 
maxCommitNanos));
+    }
+
+    @Override
+    public void close() throws IOException {
+        indexWriter.close();
+    }
+
+    @Override
+    public boolean index(final Document document, final int commitThreshold) 
throws IOException {
+        return index(Collections.singletonList(document), commitThreshold);
+    }
+
+    @Override
+    public boolean index(List<Document> documents, final int commitThreshold) 
throws IOException {
+        if (documents.isEmpty()) {
+            return false;
+        }
+
+        final int numDocs = documents.size();
+        indexWriter.addDocuments(documents);
+        totalIndexed.addAndGet(numDocs);
+
+        boolean updated = false;
+        while (!updated) {
+            final CommitStats stats = commitStats.get();
+            CommitStats updatedStats = new 
CommitStats(stats.getIndexedSinceCommit() + numDocs, 
stats.getNextCommitTimestamp());
+
+            if (updatedStats.getIndexedSinceCommit() >= commitThreshold || 
System.nanoTime() >= updatedStats.getNextCommitTimestamp()) {
+                updatedStats = new CommitStats(0, System.nanoTime() + 
maxCommitNanos);
+                updated = commitStats.compareAndSet(stats, updatedStats);
+                if (updated) {
+                    return true;
+                }
+            } else {
+                updated = commitStats.compareAndSet(stats, updatedStats);
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public File getDirectory() {
+        return directory;
+    }
+
+    @Override
+    public long commit() throws IOException {
+        final long lastCommitCount = lastCommitTotalIndexed.get();
+        final long currentCommitCount = totalIndexed.get();
+        indexWriter.commit();
+        commitStats.set(new CommitStats(0, System.nanoTime() + 
maxCommitNanos));
+        lastCommitTotalIndexed.set(currentCommitCount);
+        return currentCommitCount - lastCommitCount;
+    }
+
+    @Override
+    public int getEventsIndexedSinceCommit() {
+        return commitStats.get().getIndexedSinceCommit();
+    }
+
+    @Override
+    public long getEventsIndexed() {
+        return totalIndexed.get();
+    }
+
+    @Override
+    public IndexWriter getIndexWriter() {
+        return indexWriter;
+    }
+
+    @Override
+    public String toString() {
+        return "LuceneEventIndexWriter[dir=" + directory + "]";
+    }
+
+    private static class CommitStats {
+        private final long nextCommitTimestamp;
+        private final int indexedSinceCommit;
+
+        public CommitStats(final int indexedCount, final long nextCommitTime) {
+            this.nextCommitTimestamp = nextCommitTime;
+            this.indexedSinceCommit = indexedCount;
+        }
+
+        public long getNextCommitTimestamp() {
+            return nextCommitTimestamp;
+        }
+
+        public int getIndexedSinceCommit() {
+            return indexedSinceCommit;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
index 81816d2..b0b01e5 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
@@ -24,155 +24,306 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.index.EventIndexWriter;
+import org.apache.nifi.provenance.util.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SimpleIndexManager implements IndexManager {
     private static final Logger logger = 
LoggerFactory.getLogger(SimpleIndexManager.class);
 
-    private final ConcurrentMap<Object, List<Closeable>> closeables = new 
ConcurrentHashMap<>();
-    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
-
-    private final ExecutorService searchExecutor = 
Executors.newCachedThreadPool();
+    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); 
// guarded by synchronizing on map itself
+    private final ExecutorService searchExecutor;
+    private final RepositoryConfiguration repoConfig;
 
+    public SimpleIndexManager(final RepositoryConfiguration repoConfig) {
+        this.repoConfig = repoConfig;
+        this.searchExecutor = 
Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new 
NamedThreadFactory("Search Lucene Index"));
+    }
 
     @Override
     public void close() throws IOException {
         logger.debug("Shutting down SimpleIndexManager search executor");
-        this.searchExecutor.shutdown();
+
+        searchExecutor.shutdown();
         try {
-            if (!this.searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-                this.searchExecutor.shutdownNow();
+            if (!searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                searchExecutor.shutdownNow();
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            this.searchExecutor.shutdownNow();
+            searchExecutor.shutdownNow();
         }
     }
 
     @Override
-    public IndexSearcher borrowIndexSearcher(final File indexDir) throws 
IOException {
-        logger.debug("Creating index searcher for {}", indexDir);
-        final Directory directory = FSDirectory.open(indexDir);
-        final DirectoryReader directoryReader = 
DirectoryReader.open(directory);
-        final IndexSearcher searcher = new IndexSearcher(directoryReader, 
this.searchExecutor);
+    public EventIndexSearcher borrowIndexSearcher(final File indexDir) throws 
IOException {
+        final File absoluteFile = indexDir.getAbsoluteFile();
+
+        final IndexWriterCount writerCount;
+        synchronized (writerCounts) {
+            writerCount = writerCounts.remove(absoluteFile);
 
-        final List<Closeable> closeableList = new ArrayList<>(2);
-        closeableList.add(directoryReader);
-        closeableList.add(directory);
-        closeables.put(searcher, closeableList);
-        logger.debug("Created index searcher {} for {}", searcher, indexDir);
+            if (writerCount != null) {
+                // Increment writer count and create an Index Searcher based 
on the writer
+                writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(), writerCount.getAnalyzer(),
+                    writerCount.getDirectory(), writerCount.getCount() + 1, 
writerCount.isCloseableWhenUnused()));
+            }
+        }
+
+        final DirectoryReader directoryReader;
+        if (writerCount == null) {
+            logger.trace("Creating index searcher for {}", indexDir);
+            final Directory directory = FSDirectory.open(indexDir);
+            directoryReader = DirectoryReader.open(directory);
+        } else {
+            final EventIndexWriter eventIndexWriter = writerCount.getWriter();
+            directoryReader = 
DirectoryReader.open(eventIndexWriter.getIndexWriter(), false);
+        }
 
-        return searcher;
+        final IndexSearcher searcher = new IndexSearcher(directoryReader, 
this.searchExecutor);
+
+        logger.trace("Created index searcher {} for {}", searcher, indexDir);
+        return new LuceneEventIndexSearcher(searcher, indexDir, null, 
directoryReader);
     }
 
     @Override
-    public void returnIndexSearcher(final File indexDirectory, final 
IndexSearcher searcher) {
+    public void returnIndexSearcher(final EventIndexSearcher searcher) {
+        final File indexDirectory = searcher.getIndexDirectory();
         logger.debug("Closing index searcher {} for {}", searcher, 
indexDirectory);
+        closeQuietly(searcher);
+        logger.debug("Closed index searcher {}", searcher);
+
+        final IndexWriterCount count;
+        boolean closeWriter = false;
+        synchronized (writerCounts) {
+            final File absoluteFile = 
searcher.getIndexDirectory().getAbsoluteFile();
+            count = writerCounts.get(absoluteFile);
+            if (count == null) {
+                logger.debug("Returning EventIndexSearcher for {}; there is no 
active writer for this searcher so will not decrement writerCounts", 
absoluteFile);
+                return;
+            }
 
-        final List<Closeable> closeableList = closeables.get(searcher);
-        if (closeableList != null) {
-            for (final Closeable closeable : closeableList) {
-                closeQuietly(closeable);
+            if (count.getCount() <= 1) {
+                // we are finished with this writer.
+                final boolean close = count.isCloseableWhenUnused();
+                logger.debug("Decrementing count for Index Writer for {} to 
{}{}", indexDirectory, count.getCount() - 1, close ? "; closing writer" : "");
+
+                if (close) {
+                    writerCounts.remove(absoluteFile);
+                    closeWriter = true;
+                } else {
+                    writerCounts.put(absoluteFile, new 
IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(),
+                        count.getCount() - 1, count.isCloseableWhenUnused()));
+                }
+            } else {
+                writerCounts.put(absoluteFile, new 
IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(),
+                    count.getCount() - 1, count.isCloseableWhenUnused()));
             }
         }
 
-        logger.debug("Closed index searcher {}", searcher);
+        if (closeWriter) {
+            try {
+                close(count);
+            } catch (final Exception e) {
+                logger.warn("Failed to close Index Writer {} due to {}", 
count.getWriter(), e.toString(), e);
+            }
+        }
     }
 
     @Override
-    public void removeIndex(final File indexDirectory) {
+    public boolean removeIndex(final File indexDirectory) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.debug("Attempting to remove index {} from SimpleIndexManager", 
absoluteFile);
+
+        IndexWriterCount writerCount;
+        synchronized (writerCounts) {
+            writerCount = writerCounts.remove(absoluteFile);
+            if (writerCount == null) {
+                logger.debug("Allowing removal of index {} because there is no 
IndexWriterCount for this directory", absoluteFile);
+                return true; // return true since directory has no writers
+            }
+
+            if (writerCount.getCount() > 0) {
+                logger.debug("Not allowing removal of index {} because the 
active writer count for this directory is {}", absoluteFile, 
writerCount.getCount());
+                writerCounts.put(absoluteFile, writerCount);
+                return false;
+            }
+        }
+
+        try {
+            logger.debug("Removing index {} from SimpleIndexManager and 
closing the writer", absoluteFile);
+
+            close(writerCount);
+        } catch (final Exception e) {
+            logger.error("Failed to close Index Writer for {} while removing 
Index from the repository;"
+                + "this directory may need to be cleaned up manually.", e);
+        }
+
+        return true;
     }
 
 
-    @Override
-    public synchronized IndexWriter borrowIndexWriter(final File 
indexingDirectory) throws IOException {
-        final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.trace("Borrowing index writer for {}", indexingDirectory);
+    private IndexWriterCount createWriter(final File indexDirectory) throws 
IOException {
+        final List<Closeable> closeables = new ArrayList<>();
+        final Directory directory = FSDirectory.open(indexDirectory);
+        closeables.add(directory);
 
-        IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-        if (writerCount == null) {
-            final List<Closeable> closeables = new ArrayList<>();
-            final Directory directory = FSDirectory.open(indexingDirectory);
-            closeables.add(directory);
+        try {
+            final Analyzer analyzer = new StandardAnalyzer();
+            closeables.add(analyzer);
 
-            try {
-                final Analyzer analyzer = new StandardAnalyzer();
-                closeables.add(analyzer);
-
-                final IndexWriterConfig config = new 
IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
-                config.setWriteLockTimeout(300000L);
-
-                final IndexWriter indexWriter = new IndexWriter(directory, 
config);
-                writerCount = new IndexWriterCount(indexWriter, analyzer, 
directory, 1);
-                logger.debug("Providing new index writer for {}", 
indexingDirectory);
-            } catch (final IOException ioe) {
-                for (final Closeable closeable : closeables) {
-                    try {
-                        closeable.close();
-                    } catch (final IOException ioe2) {
-                        ioe.addSuppressed(ioe2);
-                    }
+            final IndexWriterConfig config = new 
IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+
+            final ConcurrentMergeScheduler mergeScheduler = new 
ConcurrentMergeScheduler();
+            final int mergeThreads = repoConfig.getConcurrentMergeThreads();
+            mergeScheduler.setMaxMergesAndThreads(mergeThreads, mergeThreads);
+            config.setMergeScheduler(mergeScheduler);
+
+            final IndexWriter indexWriter = new IndexWriter(directory, config);
+            final EventIndexWriter eventIndexWriter = new 
LuceneEventIndexWriter(indexWriter, indexDirectory);
+
+            final IndexWriterCount writerCount = new 
IndexWriterCount(eventIndexWriter, analyzer, directory, 1, false);
+            logger.debug("Providing new index writer for {}", indexDirectory);
+            return writerCount;
+        } catch (final IOException ioe) {
+            for (final Closeable closeable : closeables) {
+                try {
+                    closeable.close();
+                } catch (final IOException ioe2) {
+                    ioe.addSuppressed(ioe2);
                 }
+            }
+
+            throw ioe;
+        }
+    }
 
-                throw ioe;
+    @Override
+    public EventIndexWriter borrowIndexWriter(final File indexDirectory) 
throws IOException {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.trace("Borrowing index writer for {}", indexDirectory);
+
+        IndexWriterCount writerCount = null;
+        synchronized (writerCounts) {
+            writerCount = writerCounts.get(absoluteFile);
+
+            if (writerCount == null) {
+                writerCount = createWriter(indexDirectory);
+                writerCounts.put(absoluteFile, writerCount);
+            } else {
+                logger.trace("Providing existing index writer for {} and 
incrementing count to {}", indexDirectory, writerCount.getCount() + 1);
+                writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(),
+                    writerCount.getAnalyzer(), writerCount.getDirectory(), 
writerCount.getCount() + 1, writerCount.isCloseableWhenUnused()));
             }
 
-            writerCounts.put(absoluteFile, writerCount);
-        } else {
-            logger.debug("Providing existing index writer for {} and 
incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
-            writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(),
-                writerCount.getAnalyzer(), writerCount.getDirectory(), 
writerCount.getCount() + 1));
+            if (writerCounts.size() > 
repoConfig.getStorageDirectories().size() * 2) {
+                logger.debug("Index Writer returned; writer count map now has 
size {}; writerCount = {}; full writerCounts map = {}",
+                    writerCounts.size(), writerCount, writerCounts);
+            }
         }
 
         return writerCount.getWriter();
     }
 
+    @Override
+    public void returnIndexWriter(final EventIndexWriter writer) {
+        returnIndexWriter(writer, true, true);
+    }
 
     @Override
-    public synchronized void returnIndexWriter(final File indexingDirectory, 
final IndexWriter writer) {
-        final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.trace("Returning Index Writer for {} to IndexManager", 
indexingDirectory);
+    public void returnIndexWriter(final EventIndexWriter writer, final boolean 
commit, final boolean isCloseable) {
+        final File indexDirectory = writer.getDirectory();
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.trace("Returning Index Writer for {} to IndexManager", 
indexDirectory);
+
+        boolean unused = false;
+        IndexWriterCount count = null;
+        boolean close = isCloseable;
+        try {
+            synchronized (writerCounts) {
+                count = writerCounts.get(absoluteFile);
+                if (count != null && count.isCloseableWhenUnused()) {
+                    close = true;
+                }
 
-        final IndexWriterCount count = writerCounts.remove(absoluteFile);
+                if (count == null) {
+                    logger.warn("Index Writer {} was returned to IndexManager 
for {}, but this writer is not known. "
+                        + "This could potentially lead to a resource leak", 
writer, indexDirectory);
+                    writer.close();
+                } else if (count.getCount() <= 1) {
+                    // we are finished with this writer.
+                    unused = true;
+                    if (close) {
+                        logger.debug("Decrementing count for Index Writer for 
{} to {}; closing writer", indexDirectory, count.getCount() - 1);
+                        writerCounts.remove(absoluteFile);
+                    } else {
+                        logger.trace("Decrementing count for Index Writer for 
{} to {}", indexDirectory, count.getCount() - 1);
+
+                        // If writer is not closeable, then we need to 
decrement its count.
+                        writerCounts.put(absoluteFile, new 
IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(),
+                            count.getCount() - 1, close));
+                    }
+                } else {
+                    // decrement the count.
+                    if (close) {
+                        logger.debug("Decrementing count for Index Writer for 
{} to {} and marking as closeable when no longer in use", indexDirectory, 
count.getCount() - 1);
+                    } else {
+                        logger.trace("Decrementing count for Index Writer for 
{} to {}", indexDirectory, count.getCount() - 1);
+                    }
 
-        try {
-            if (count == null) {
-                logger.warn("Index Writer {} was returned to IndexManager for 
{}, but this writer is not known. "
-                    + "This could potentially lead to a resource leak", 
writer, indexingDirectory);
-                writer.close();
-            } else if (count.getCount() <= 1) {
-                // we are finished with this writer.
-                logger.debug("Decrementing count for Index Writer for {} to 
{}; Closing writer", indexingDirectory, count.getCount() - 1);
+                    writerCounts.put(absoluteFile, new 
IndexWriterCount(count.getWriter(), count.getAnalyzer(),
+                        count.getDirectory(), count.getCount() - 1, close));
+                }
+
+                if (writerCounts.size() > 
repoConfig.getStorageDirectories().size() * 2) {
+                    logger.debug("Index Writer returned; writer count map now 
has size {}; writer = {}, commit = {}, isCloseable = {}, writerCount = {}; full 
writerCounts Map = {}",
+                        writerCounts.size(), writer, commit, isCloseable, 
count, writerCounts);
+                }
+            }
+
+            // Committing and closing are very expensive, so we want to do 
those outside of the synchronized block.
+            // So we use an 'unused' variable to tell us whether or not we 
should actually do so.
+            if (unused) {
                 try {
-                    writer.commit();
+                    if (commit) {
+                        writer.commit();
+                    }
                 } finally {
-                    count.close();
+                    if (close) {
+                        logger.info("Index Writer for {} has been returned to 
Index Manager and is no longer in use. Closing Index Writer", indexDirectory);
+                        close(count);
+                    }
                 }
-            } else {
-                // decrement the count.
-                logger.debug("Decrementing count for Index Writer for {} to 
{}", indexingDirectory, count.getCount() - 1);
-                writerCounts.put(absoluteFile, new 
IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), 
count.getCount() - 1));
-            }
-        } catch (final IOException ioe) {
-            logger.warn("Failed to close Index Writer {} due to {}", writer, 
ioe);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", ioe);
             }
+        } catch (final Exception e) {
+            logger.warn("Failed to close Index Writer {} due to {}", writer, 
e.toString(), e);
+        }
+    }
+
+    // This method exists solely for unit testing purposes.
+    protected void close(final IndexWriterCount count) throws IOException {
+        count.close();
+    }
+
+    protected int getWriterCount() {
+        synchronized (writerCounts) {
+            return writerCounts.size();
         }
     }
 
@@ -191,17 +342,23 @@ public class SimpleIndexManager implements IndexManager {
     }
 
 
-    private static class IndexWriterCount implements Closeable {
-        private final IndexWriter writer;
+    protected static class IndexWriterCount implements Closeable {
+        private final EventIndexWriter writer;
         private final Analyzer analyzer;
         private final Directory directory;
         private final int count;
+        private final boolean closeableWhenUnused;
 
-        public IndexWriterCount(final IndexWriter writer, final Analyzer 
analyzer, final Directory directory, final int count) {
+        public IndexWriterCount(final EventIndexWriter writer, final Analyzer 
analyzer, final Directory directory, final int count, final boolean 
closeableWhenUnused) {
             this.writer = writer;
             this.analyzer = analyzer;
             this.directory = directory;
             this.count = count;
+            this.closeableWhenUnused = closeableWhenUnused;
+        }
+
+        public boolean isCloseableWhenUnused() {
+            return closeableWhenUnused;
         }
 
         public Analyzer getAnalyzer() {
@@ -212,7 +369,7 @@ public class SimpleIndexManager implements IndexManager {
             return directory;
         }
 
-        public IndexWriter getWriter() {
+        public EventIndexWriter getWriter() {
             return writer;
         }
 
@@ -224,5 +381,10 @@ public class SimpleIndexManager implements IndexManager {
         public void close() throws IOException {
             closeQuietly(writer, analyzer, directory);
         }
+
+        @Override
+        public String toString() {
+            return "IndexWriterCount[count=" + count + ", writer=" + writer + 
", closeableWhenUnused=" + closeableWhenUnused + "]";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
new file mode 100644
index 0000000..d6f50dd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+public class EventFieldNames {
+    public static final String EVENT_IDENTIFIER = "Event ID";
+    public static final String EVENT_TYPE = "Event Type";
+    public static final String EVENT_TIME = "Event Time";
+    public static final String FLOWFILE_ENTRY_DATE = "FlowFile Entry Date";
+    public static final String EVENT_DURATION = "Event Duration";
+    public static final String LINEAGE_START_DATE = "Lineage Start Date";
+    public static final String COMPONENT_ID = "Component ID";
+    public static final String COMPONENT_TYPE = "Component Type";
+    public static final String FLOWFILE_UUID = "FlowFile UUID";
+    public static final String EVENT_DETAILS = "Event Details";
+    public static final String SOURCE_QUEUE_IDENTIFIER = "Source Queue 
Identifier";
+    public static final String CONTENT_CLAIM = "Content Claim";
+    public static final String PREVIOUS_CONTENT_CLAIM = "Previous Content 
Claim";
+    public static final String EXPLICIT_CURRENT_CONTENT_CLAIM = "Full Current 
Content Claim";
+    public static final String PARENT_UUIDS = "Parent UUIDs";
+    public static final String CHILD_UUIDS = "Child UUIDs";
+
+    public static final String ATTRIBUTE_NAME = "Attribute Name";
+    public static final String ATTRIBUTE_VALUE = "Attribute Value";
+    public static final String PREVIOUS_ATTRIBUTES = "Previous Attributes";
+    public static final String UPDATED_ATTRIBUTES = "Updated Attributes";
+
+    public static final String CONTENT_CLAIM_CONTAINER = "Content Claim 
Container";
+    public static final String CONTENT_CLAIM_SECTION = "Content Claim Section";
+    public static final String CONTENT_CLAIM_IDENTIFIER = "Content Claim 
Identifier";
+    public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset";
+    public static final String CONTENT_CLAIM_SIZE = "Content Claim Size";
+
+    public static final String TRANSIT_URI = "Transit URI";
+    public static final String SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = "Source 
System FlowFile Identifier";
+    public static final String ALTERNATE_IDENTIFIER = "Alternate Identifier";
+    public static final String RELATIONSHIP = "Relationship";
+
+    // For Lookup Tables
+    public static final String NO_VALUE = "No Value";
+    public static final String EXPLICIT_VALUE = "Explicit Value";
+    public static final String LOOKUP_VALUE = "Lookup Value";
+    public static final String UNCHANGED_VALUE = "Unchanged";
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventIdFirstHeaderSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventIdFirstHeaderSchema.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventIdFirstHeaderSchema.java
new file mode 100644
index 0000000..1c35c5a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventIdFirstHeaderSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class EventIdFirstHeaderSchema {
+
+    public static RecordSchema SCHEMA = buildSchema();
+
+    public static final class FieldNames {
+        public static final String FIRST_EVENT_ID = "First Event ID";
+        public static final String TIMESTAMP_OFFSET = "Timestamp Offset";
+        public static final String COMPONENT_IDS = "Component Identifiers";
+        public static final String COMPONENT_TYPES = "Component Types";
+        public static final String QUEUE_IDS = "Queue Identifiers";
+        public static final String EVENT_TYPES = "Event Types";
+    }
+
+    private static RecordSchema buildSchema() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new SimpleRecordField(FieldNames.FIRST_EVENT_ID, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        fields.add(new SimpleRecordField(FieldNames.TIMESTAMP_OFFSET, 
FieldType.LONG, Repetition.EXACTLY_ONE));
+        fields.add(new SimpleRecordField(FieldNames.COMPONENT_IDS, 
FieldType.STRING, Repetition.ZERO_OR_MORE));
+        fields.add(new SimpleRecordField(FieldNames.COMPONENT_TYPES, 
FieldType.STRING, Repetition.ZERO_OR_MORE));
+        fields.add(new SimpleRecordField(FieldNames.QUEUE_IDS, 
FieldType.STRING, Repetition.ZERO_OR_MORE));
+        fields.add(new SimpleRecordField(FieldNames.EVENT_TYPES, 
FieldType.STRING, Repetition.ZERO_OR_MORE));
+        return new RecordSchema(fields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
index c9e7dc8..8c82b11 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
@@ -69,47 +69,47 @@ public class EventRecord implements Record {
     @Override
     public Object getFieldValue(final String fieldName) {
         switch (fieldName) {
-            case EventRecordFields.Names.EVENT_IDENTIFIER:
+            case EventFieldNames.EVENT_IDENTIFIER:
                 return eventId;
-            case EventRecordFields.Names.ALTERNATE_IDENTIFIER:
+            case EventFieldNames.ALTERNATE_IDENTIFIER:
                 return event.getAlternateIdentifierUri();
-            case EventRecordFields.Names.CHILD_UUIDS:
+            case EventFieldNames.CHILD_UUIDS:
                 return event.getChildUuids();
-            case EventRecordFields.Names.COMPONENT_ID:
+            case EventFieldNames.COMPONENT_ID:
                 return event.getComponentId();
-            case EventRecordFields.Names.COMPONENT_TYPE:
+            case EventFieldNames.COMPONENT_TYPE:
                 return event.getComponentType();
-            case EventRecordFields.Names.CONTENT_CLAIM:
+            case EventFieldNames.CONTENT_CLAIM:
                 return contentClaimRecord;
-            case EventRecordFields.Names.EVENT_DETAILS:
+            case EventFieldNames.EVENT_DETAILS:
                 return event.getDetails();
-            case EventRecordFields.Names.EVENT_DURATION:
+            case EventFieldNames.EVENT_DURATION:
                 return event.getEventDuration();
-            case EventRecordFields.Names.EVENT_TIME:
+            case EventFieldNames.EVENT_TIME:
                 return event.getEventTime();
-            case EventRecordFields.Names.EVENT_TYPE:
+            case EventFieldNames.EVENT_TYPE:
                 return event.getEventType().name();
-            case EventRecordFields.Names.FLOWFILE_ENTRY_DATE:
+            case EventFieldNames.FLOWFILE_ENTRY_DATE:
                 return event.getFlowFileEntryDate();
-            case EventRecordFields.Names.FLOWFILE_UUID:
+            case EventFieldNames.FLOWFILE_UUID:
                 return event.getFlowFileUuid();
-            case EventRecordFields.Names.LINEAGE_START_DATE:
+            case EventFieldNames.LINEAGE_START_DATE:
                 return event.getLineageStartDate();
-            case EventRecordFields.Names.PARENT_UUIDS:
+            case EventFieldNames.PARENT_UUIDS:
                 return event.getParentUuids();
-            case EventRecordFields.Names.PREVIOUS_ATTRIBUTES:
+            case EventFieldNames.PREVIOUS_ATTRIBUTES:
                 return event.getPreviousAttributes();
-            case EventRecordFields.Names.PREVIOUS_CONTENT_CLAIM:
+            case EventFieldNames.PREVIOUS_CONTENT_CLAIM:
                 return previousClaimRecord;
-            case EventRecordFields.Names.RELATIONSHIP:
+            case EventFieldNames.RELATIONSHIP:
                 return event.getRelationship();
-            case EventRecordFields.Names.SOURCE_QUEUE_IDENTIFIER:
+            case EventFieldNames.SOURCE_QUEUE_IDENTIFIER:
                 return event.getSourceQueueIdentifier();
-            case EventRecordFields.Names.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER:
+            case EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER:
                 return event.getSourceSystemFlowFileIdentifier();
-            case EventRecordFields.Names.TRANSIT_URI:
+            case EventFieldNames.TRANSIT_URI:
                 return event.getTransitUri();
-            case EventRecordFields.Names.UPDATED_ATTRIBUTES:
+            case EventFieldNames.UPDATED_ATTRIBUTES:
                 return event.getUpdatedAttributes();
         }
 
@@ -119,48 +119,52 @@ public class EventRecord implements Record {
     @SuppressWarnings("unchecked")
     public static StandardProvenanceEventRecord getEvent(final Record record, 
final String storageFilename, final long storageByteOffset, final int 
maxAttributeLength) {
         final StandardProvenanceEventRecord.Builder builder = new 
StandardProvenanceEventRecord.Builder();
-        builder.setAlternateIdentifierUri((String) 
record.getFieldValue(EventRecordFields.Names.ALTERNATE_IDENTIFIER));
-        builder.setChildUuids((List<String>) 
record.getFieldValue(EventRecordFields.Names.CHILD_UUIDS));
-        builder.setComponentId((String) 
record.getFieldValue(EventRecordFields.Names.COMPONENT_ID));
-        builder.setComponentType((String) 
record.getFieldValue(EventRecordFields.Names.COMPONENT_TYPE));
-        builder.setDetails((String) 
record.getFieldValue(EventRecordFields.Names.EVENT_DETAILS));
-        builder.setEventDuration((Long) 
record.getFieldValue(EventRecordFields.Names.EVENT_DURATION));
-        builder.setEventTime((Long) 
record.getFieldValue(EventRecordFields.Names.EVENT_TIME));
-        builder.setEventType(ProvenanceEventType.valueOf((String) 
record.getFieldValue(EventRecordFields.Names.EVENT_TYPE)));
-        builder.setFlowFileEntryDate((Long) 
record.getFieldValue(EventRecordFields.Names.FLOWFILE_ENTRY_DATE));
-        builder.setFlowFileUUID((String) 
record.getFieldValue(EventRecordFields.Names.FLOWFILE_UUID));
-        builder.setLineageStartDate((Long) 
record.getFieldValue(EventRecordFields.Names.LINEAGE_START_DATE));
-        builder.setParentUuids((List<String>) 
record.getFieldValue(EventRecordFields.Names.PARENT_UUIDS));
-        builder.setPreviousAttributes(truncateAttributes((Map<String, String>) 
record.getFieldValue(EventRecordFields.Names.PREVIOUS_ATTRIBUTES), 
maxAttributeLength));
-        builder.setEventId((Long) 
record.getFieldValue(EventRecordFields.Names.EVENT_IDENTIFIER));
-        builder.setRelationship((String) 
record.getFieldValue(EventRecordFields.Names.RELATIONSHIP));
-        builder.setSourceQueueIdentifier((String) 
record.getFieldValue(EventRecordFields.Names.SOURCE_QUEUE_IDENTIFIER));
-        builder.setSourceSystemFlowFileIdentifier((String) 
record.getFieldValue(EventRecordFields.Names.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
-        builder.setTransitUri((String) 
record.getFieldValue(EventRecordFields.Names.TRANSIT_URI));
-        builder.setUpdatedAttributes(truncateAttributes((Map<String, String>) 
record.getFieldValue(EventRecordFields.Names.UPDATED_ATTRIBUTES), 
maxAttributeLength));
+        builder.setAlternateIdentifierUri((String) 
record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER));
+        builder.setChildUuids((List<String>) 
record.getFieldValue(EventFieldNames.CHILD_UUIDS));
+        builder.setComponentId((String) 
record.getFieldValue(EventFieldNames.COMPONENT_ID));
+        builder.setComponentType((String) 
record.getFieldValue(EventFieldNames.COMPONENT_TYPE));
+        builder.setDetails((String) 
record.getFieldValue(EventFieldNames.EVENT_DETAILS));
+        builder.setEventDuration((Long) 
record.getFieldValue(EventFieldNames.EVENT_DURATION));
+        builder.setEventTime((Long) 
record.getFieldValue(EventFieldNames.EVENT_TIME));
+        builder.setEventType(ProvenanceEventType.valueOf((String) 
record.getFieldValue(EventFieldNames.EVENT_TYPE)));
+        builder.setFlowFileEntryDate((Long) 
record.getFieldValue(EventFieldNames.FLOWFILE_ENTRY_DATE));
+        builder.setFlowFileUUID((String) 
record.getFieldValue(EventFieldNames.FLOWFILE_UUID));
+        builder.setLineageStartDate((Long) 
record.getFieldValue(EventFieldNames.LINEAGE_START_DATE));
+        builder.setParentUuids((List<String>) 
record.getFieldValue(EventFieldNames.PARENT_UUIDS));
+        builder.setPreviousAttributes(truncateAttributes((Map<String, String>) 
record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength));
+        builder.setRelationship((String) 
record.getFieldValue(EventFieldNames.RELATIONSHIP));
+        builder.setSourceQueueIdentifier((String) 
record.getFieldValue(EventFieldNames.SOURCE_QUEUE_IDENTIFIER));
+        builder.setSourceSystemFlowFileIdentifier((String) 
record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
+        builder.setTransitUri((String) 
record.getFieldValue(EventFieldNames.TRANSIT_URI));
+        builder.setUpdatedAttributes(truncateAttributes((Map<String, String>) 
record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength));
+
+        final Long eventId = (Long) 
record.getFieldValue(EventFieldNames.EVENT_IDENTIFIER);
+        if (eventId != null) {
+            builder.setEventId(eventId);
+        }
 
         builder.setStorageLocation(storageFilename, storageByteOffset);
 
-        final Record currentClaimRecord = (Record) 
record.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM);
+        final Record currentClaimRecord = (Record) 
record.getFieldValue(EventFieldNames.CONTENT_CLAIM);
         if (currentClaimRecord == null) {
             builder.setCurrentContentClaim(null, null, null, null, 0L);
         } else {
             builder.setCurrentContentClaim(
-                (String) 
currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_CONTAINER),
-                (String) 
currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SECTION),
-                (String) 
currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_IDENTIFIER),
-                (Long) 
currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_OFFSET),
-                (Long) 
currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SIZE));
+                (String) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_CONTAINER),
+                (String) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SECTION),
+                (String) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_IDENTIFIER),
+                (Long) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_OFFSET),
+                (Long) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SIZE));
         }
 
-        final Record previousClaimRecord = (Record) 
record.getFieldValue(EventRecordFields.Names.PREVIOUS_CONTENT_CLAIM);
+        final Record previousClaimRecord = (Record) 
record.getFieldValue(EventFieldNames.PREVIOUS_CONTENT_CLAIM);
         if (previousClaimRecord != null) {
             builder.setPreviousContentClaim(
-                (String) 
previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_CONTAINER),
-                (String) 
previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SECTION),
-                (String) 
previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_IDENTIFIER),
-                (Long) 
previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_OFFSET),
-                (Long) 
previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SIZE));
+                (String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_CONTAINER),
+                (String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SECTION),
+                (String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_IDENTIFIER),
+                (Long) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_OFFSET),
+                (Long) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SIZE));
         }
 
         return builder.build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
index 0582dd8..3d79ab4 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
@@ -29,82 +29,48 @@ import org.apache.nifi.repository.schema.SimpleRecordField;
 
 public class EventRecordFields {
 
-    public static class Names {
-        public static final String EVENT_IDENTIFIER = "Event ID";
-        public static final String EVENT_TYPE = "Event Type";
-        public static final String EVENT_TIME = "Event Time";
-        public static final String FLOWFILE_ENTRY_DATE = "FlowFile Entry Date";
-        public static final String EVENT_DURATION = "Event Duration";
-        public static final String LINEAGE_START_DATE = "Lineage Start Date";
-        public static final String COMPONENT_ID = "Component ID";
-        public static final String COMPONENT_TYPE = "Component Type";
-        public static final String FLOWFILE_UUID = "FlowFile UUID";
-        public static final String EVENT_DETAILS = "Event Details";
-        public static final String SOURCE_QUEUE_IDENTIFIER = "Source Queue 
Identifier";
-        public static final String CONTENT_CLAIM = "Content Claim";
-        public static final String PREVIOUS_CONTENT_CLAIM = "Previous Content 
Claim";
-        public static final String PARENT_UUIDS = "Parent UUIDs";
-        public static final String CHILD_UUIDS = "Child UUIDs";
-
-        public static final String ATTRIBUTE_NAME = "Attribute Name";
-        public static final String ATTRIBUTE_VALUE = "Attribute Value";
-        public static final String PREVIOUS_ATTRIBUTES = "Previous Attributes";
-        public static final String UPDATED_ATTRIBUTES = "Updated Attributes";
-
-        public static final String CONTENT_CLAIM_CONTAINER = "Content Claim 
Container";
-        public static final String CONTENT_CLAIM_SECTION = "Content Claim 
Section";
-        public static final String CONTENT_CLAIM_IDENTIFIER = "Content Claim 
Identifier";
-        public static final String CONTENT_CLAIM_OFFSET = "Content Claim 
Offset";
-        public static final String CONTENT_CLAIM_SIZE = "Content Claim Size";
-
-        public static final String TRANSIT_URI = "Transit URI";
-        public static final String SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = "Source 
System FlowFile Identifier";
-        public static final String ALTERNATE_IDENTIFIER = "Alternate 
Identifier";
-        public static final String RELATIONSHIP = "Relationship";
-    }
-
     // General Event fields.
-    public static final RecordField RECORD_IDENTIFIER = new 
SimpleRecordField(Names.EVENT_IDENTIFIER, FieldType.LONG, EXACTLY_ONE);
-    public static final RecordField EVENT_TYPE = new 
SimpleRecordField(Names.EVENT_TYPE, FieldType.STRING, EXACTLY_ONE);
-    public static final RecordField EVENT_TIME = new 
SimpleRecordField(Names.EVENT_TIME, FieldType.LONG, EXACTLY_ONE);
-    public static final RecordField FLOWFILE_ENTRY_DATE = new 
SimpleRecordField(Names.FLOWFILE_ENTRY_DATE, FieldType.LONG, EXACTLY_ONE);
-    public static final RecordField EVENT_DURATION = new 
SimpleRecordField(Names.EVENT_DURATION, FieldType.LONG, EXACTLY_ONE);
-    public static final RecordField LINEAGE_START_DATE = new 
SimpleRecordField(Names.LINEAGE_START_DATE, FieldType.LONG, EXACTLY_ONE);
-    public static final RecordField COMPONENT_ID = new 
SimpleRecordField(Names.COMPONENT_ID, FieldType.STRING, ZERO_OR_ONE);
-    public static final RecordField COMPONENT_TYPE = new 
SimpleRecordField(Names.COMPONENT_TYPE, FieldType.STRING, ZERO_OR_ONE);
-    public static final RecordField FLOWFILE_UUID = new 
SimpleRecordField(Names.FLOWFILE_UUID, FieldType.STRING, EXACTLY_ONE);
-    public static final RecordField EVENT_DETAILS = new 
SimpleRecordField(Names.EVENT_DETAILS, FieldType.STRING, ZERO_OR_ONE);
-    public static final RecordField SOURCE_QUEUE_IDENTIFIER = new 
SimpleRecordField(Names.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField RECORD_IDENTIFIER = new 
SimpleRecordField(EventFieldNames.EVENT_IDENTIFIER, FieldType.LONG, 
EXACTLY_ONE);
+    public static final RecordField EVENT_TYPE = new 
SimpleRecordField(EventFieldNames.EVENT_TYPE, FieldType.STRING, EXACTLY_ONE);
+    public static final RecordField EVENT_TIME = new 
SimpleRecordField(EventFieldNames.EVENT_TIME, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField FLOWFILE_ENTRY_DATE = new 
SimpleRecordField(EventFieldNames.FLOWFILE_ENTRY_DATE, FieldType.LONG, 
EXACTLY_ONE);
+    public static final RecordField EVENT_DURATION = new 
SimpleRecordField(EventFieldNames.EVENT_DURATION, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField LINEAGE_START_DATE = new 
SimpleRecordField(EventFieldNames.LINEAGE_START_DATE, FieldType.LONG, 
EXACTLY_ONE);
+    public static final RecordField COMPONENT_ID = new 
SimpleRecordField(EventFieldNames.COMPONENT_ID, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField COMPONENT_TYPE = new 
SimpleRecordField(EventFieldNames.COMPONENT_TYPE, FieldType.STRING, 
ZERO_OR_ONE);
+    public static final RecordField FLOWFILE_UUID = new 
SimpleRecordField(EventFieldNames.FLOWFILE_UUID, FieldType.STRING, EXACTLY_ONE);
+    public static final RecordField EVENT_DETAILS = new 
SimpleRecordField(EventFieldNames.EVENT_DETAILS, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField SOURCE_QUEUE_IDENTIFIER = new 
SimpleRecordField(EventFieldNames.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, 
ZERO_OR_ONE);
 
     // Attributes
-    public static final RecordField ATTRIBUTE_NAME = new 
SimpleRecordField(Names.ATTRIBUTE_NAME, FieldType.LONG_STRING, EXACTLY_ONE);
-    public static final RecordField ATTRIBUTE_VALUE_REQUIRED = new 
SimpleRecordField(Names.ATTRIBUTE_VALUE, FieldType.LONG_STRING, EXACTLY_ONE);
-    public static final RecordField ATTRIBUTE_VALUE_OPTIONAL = new 
SimpleRecordField(Names.ATTRIBUTE_VALUE, FieldType.LONG_STRING, ZERO_OR_ONE);
+    public static final RecordField ATTRIBUTE_NAME = new 
SimpleRecordField(EventFieldNames.ATTRIBUTE_NAME, FieldType.LONG_STRING, 
EXACTLY_ONE);
+    public static final RecordField ATTRIBUTE_VALUE_REQUIRED = new 
SimpleRecordField(EventFieldNames.ATTRIBUTE_VALUE, FieldType.LONG_STRING, 
EXACTLY_ONE);
+    public static final RecordField ATTRIBUTE_VALUE_OPTIONAL = new 
SimpleRecordField(EventFieldNames.ATTRIBUTE_VALUE, FieldType.LONG_STRING, 
ZERO_OR_ONE);
 
-    public static final RecordField PREVIOUS_ATTRIBUTES = new 
MapRecordField(Names.PREVIOUS_ATTRIBUTES, ATTRIBUTE_NAME, 
ATTRIBUTE_VALUE_REQUIRED, EXACTLY_ONE);
-    public static final RecordField UPDATED_ATTRIBUTES = new 
MapRecordField(Names.UPDATED_ATTRIBUTES, ATTRIBUTE_NAME, 
ATTRIBUTE_VALUE_OPTIONAL, EXACTLY_ONE);
+    public static final RecordField PREVIOUS_ATTRIBUTES = new 
MapRecordField(EventFieldNames.PREVIOUS_ATTRIBUTES, ATTRIBUTE_NAME, 
ATTRIBUTE_VALUE_REQUIRED, EXACTLY_ONE);
+    public static final RecordField UPDATED_ATTRIBUTES = new 
MapRecordField(EventFieldNames.UPDATED_ATTRIBUTES, ATTRIBUTE_NAME, 
ATTRIBUTE_VALUE_OPTIONAL, EXACTLY_ONE);
 
     // Content Claims
-    public static final RecordField CONTENT_CLAIM_CONTAINER = new 
SimpleRecordField(Names.CONTENT_CLAIM_CONTAINER, FieldType.STRING, EXACTLY_ONE);
-    public static final RecordField CONTENT_CLAIM_SECTION = new 
SimpleRecordField(Names.CONTENT_CLAIM_SECTION, FieldType.STRING, EXACTLY_ONE);
-    public static final RecordField CONTENT_CLAIM_IDENTIFIER = new 
SimpleRecordField(Names.CONTENT_CLAIM_IDENTIFIER, FieldType.STRING, 
EXACTLY_ONE);
-    public static final RecordField CONTENT_CLAIM_OFFSET = new 
SimpleRecordField(Names.CONTENT_CLAIM_OFFSET, FieldType.LONG, EXACTLY_ONE);
-    public static final RecordField CONTENT_CLAIM_SIZE = new 
SimpleRecordField(Names.CONTENT_CLAIM_SIZE, FieldType.LONG, EXACTLY_ONE);
-    public static final RecordField CURRENT_CONTENT_CLAIM = new 
ComplexRecordField(Names.CONTENT_CLAIM, ZERO_OR_ONE,
+    public static final RecordField CONTENT_CLAIM_CONTAINER = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_CONTAINER, FieldType.STRING, 
EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_SECTION = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SECTION, FieldType.STRING, 
EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_IDENTIFIER = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_IDENTIFIER, FieldType.STRING, 
EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_OFFSET = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_OFFSET, FieldType.LONG, 
EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_SIZE = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SIZE, FieldType.LONG, 
EXACTLY_ONE);
+    public static final RecordField CURRENT_CONTENT_CLAIM = new 
ComplexRecordField(EventFieldNames.CONTENT_CLAIM, ZERO_OR_ONE,
         CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, 
CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
-    public static final RecordField PREVIOUS_CONTENT_CLAIM = new 
ComplexRecordField(Names.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE,
+    public static final RecordField PREVIOUS_CONTENT_CLAIM = new 
ComplexRecordField(EventFieldNames.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE,
         CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, 
CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
 
     // EventType-Specific fields
     // for FORK, JOIN, CLONE, REPLAY
-    public static final RecordField PARENT_UUIDS = new 
SimpleRecordField(Names.PARENT_UUIDS, FieldType.STRING, ZERO_OR_MORE);
-    public static final RecordField CHILD_UUIDS = new 
SimpleRecordField(Names.CHILD_UUIDS, FieldType.STRING, ZERO_OR_MORE);
+    public static final RecordField PARENT_UUIDS = new 
SimpleRecordField(EventFieldNames.PARENT_UUIDS, FieldType.STRING, ZERO_OR_MORE);
+    public static final RecordField CHILD_UUIDS = new 
SimpleRecordField(EventFieldNames.CHILD_UUIDS, FieldType.STRING, ZERO_OR_MORE);
 
     // for SEND/RECEIVE/FETCH
-    public static final RecordField TRANSIT_URI = new 
SimpleRecordField(Names.TRANSIT_URI, FieldType.STRING, ZERO_OR_ONE);
-    public static final RecordField SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = new 
SimpleRecordField(Names.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField TRANSIT_URI = new 
SimpleRecordField(EventFieldNames.TRANSIT_URI, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = new 
SimpleRecordField(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER, 
FieldType.STRING, ZERO_OR_ONE);
 
     // for ADD_INFO
-    public static final RecordField ALTERNATE_IDENTIFIER = new 
SimpleRecordField(Names.ALTERNATE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
-    public static final RecordField RELATIONSHIP = new 
SimpleRecordField(Names.RELATIONSHIP, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField ALTERNATE_IDENTIFIER = new 
SimpleRecordField(EventFieldNames.ALTERNATE_IDENTIFIER, FieldType.STRING, 
ZERO_OR_ONE);
+    public static final RecordField RELATIONSHIP = new 
SimpleRecordField(EventFieldNames.RELATIONSHIP, FieldType.STRING, ZERO_OR_ONE);
 }

Reply via email to