http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
new file mode 100644
index 0000000..a583403
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.TermQuery;
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.AsyncLineageSubmission;
+import org.apache.nifi.provenance.AsyncQuerySubmission;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardLineageResult;
+import org.apache.nifi.provenance.StandardQueryResult;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.authorization.EventTransformer;
+import org.apache.nifi.provenance.index.EventIndex;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.index.EventIndexWriter;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.LineageComputationType;
+import org.apache.nifi.provenance.lucene.IndexManager;
+import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.provenance.store.EventStore;
+import org.apache.nifi.provenance.util.DirectoryUtils;
+import org.apache.nifi.provenance.util.NamedThreadFactory;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.util.timebuffer.LongEntityAccess;
+import org.apache.nifi.util.timebuffer.TimedBuffer;
+import org.apache.nifi.util.timebuffer.TimestampedLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LuceneEventIndex implements EventIndex {
+    private static final Logger logger = 
LoggerFactory.getLogger(LuceneEventIndex.class);
+    private static final String EVENT_CATEGORY = "Provenance Repository";
+
+    public static final int MAX_UNDELETED_QUERY_RESULTS = 10;
+    public static final int MAX_DELETE_INDEX_WAIT_SECONDS = 30;
+    public static final int MAX_LINEAGE_NODES = 1000;
+    public static final int MAX_INDEX_THREADS = 100;
+
+    private final ConcurrentMap<String, AsyncQuerySubmission> 
querySubmissionMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, AsyncLineageSubmission> 
lineageSubmissionMap = new ConcurrentHashMap<>();
+    private final BlockingQueue<StoredDocument> documentQueue = new 
LinkedBlockingQueue<>(1000);
+    private final List<EventIndexTask> indexTasks = 
Collections.synchronizedList(new ArrayList<>());
+    private final ExecutorService queryExecutor;
+    private final ExecutorService indexExecutor;
+    private final RepositoryConfiguration config;
+    private final IndexManager indexManager;
+    private final ConvertEventToLuceneDocument eventConverter;
+    private final IndexDirectoryManager directoryManager;
+    private volatile boolean closed = false;
+
+    private final TimedBuffer<TimestampedLong> queuePauseNanos = new 
TimedBuffer<>(TimeUnit.SECONDS, 300, new LongEntityAccess());
+    private final TimedBuffer<TimestampedLong> eventsIndexed = new 
TimedBuffer<>(TimeUnit.SECONDS, 300, new LongEntityAccess());
+    private final AtomicLong eventCount = new AtomicLong(0L);
+    private final EventReporter eventReporter;
+
+    private final List<CachedQuery> cachedQueries = new ArrayList<>();
+
+    private ScheduledExecutorService maintenanceExecutor; // effectively final
+    private ScheduledExecutorService cacheWarmerExecutor;
+    private EventStore eventStore;
+
+    public LuceneEventIndex(final RepositoryConfiguration config, final 
IndexManager indexManager, final EventReporter eventReporter) {
+        this(config, indexManager, 
EventIndexTask.DEFAULT_MAX_EVENTS_PER_COMMIT, eventReporter);
+    }
+
+    public LuceneEventIndex(final RepositoryConfiguration config, final 
IndexManager indexManager, final int maxEventsPerCommit, final EventReporter 
eventReporter) {
+        this.eventReporter = eventReporter;
+        queryExecutor = 
Executors.newFixedThreadPool(config.getQueryThreadPoolSize(), new 
NamedThreadFactory("Provenance Query"));
+        indexExecutor = 
Executors.newFixedThreadPool(config.getIndexThreadPoolSize(), new 
NamedThreadFactory("Index Provenance Events"));
+        cacheWarmerExecutor = 
Executors.newScheduledThreadPool(config.getStorageDirectories().size(), new 
NamedThreadFactory("Warm Lucene Index", true));
+        directoryManager = new IndexDirectoryManager(config);
+
+        // Limit number of indexing threads to 100. When we restore the 
repository on restart,
+        // we have to re-index up to MAX_THREADS * MAX_DOCUMENTS_PER_THREADS 
events prior to
+        // the last event that the index holds. This is done because we could 
have that many
+        // events 'in flight', waiting to be indexed when the last index 
writer was committed,
+        // so even though the index says the largest event ID is 1,000,000 for 
instance, Event
+        // with ID 999,999 may still not have been indexed because another 
thread was in the
+        // process of writing the event to the index.
+        final int configuredIndexPoolSize = config.getIndexThreadPoolSize();
+        final int numIndexThreads;
+        if (configuredIndexPoolSize > MAX_INDEX_THREADS) {
+            logger.warn("The Provenance Repository is configured to perform 
indexing of events using {} threads. This number exceeds the maximum allowable 
number of threads, which is {}. "
+                + "Will proceed using {} threads. This value is limited 
because the performance of indexing will decrease and startup times will 
increase when setting this value too high.",
+                configuredIndexPoolSize, MAX_INDEX_THREADS, MAX_INDEX_THREADS);
+            numIndexThreads = MAX_INDEX_THREADS;
+        } else {
+            numIndexThreads = configuredIndexPoolSize;
+        }
+
+        for (int i = 0; i < numIndexThreads; i++) {
+            final EventIndexTask task = new EventIndexTask(documentQueue, 
config, indexManager, directoryManager, maxEventsPerCommit, eventReporter);
+            indexTasks.add(task);
+            indexExecutor.submit(task);
+        }
+        this.config = config;
+        this.indexManager = indexManager;
+        this.eventConverter = new 
ConvertEventToLuceneDocument(config.getSearchableFields(), 
config.getSearchableAttributes());
+    }
+
+    @Override
+    public void initialize(final EventStore eventStore) {
+        this.eventStore = eventStore;
+        directoryManager.initialize();
+
+        maintenanceExecutor = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Provenance Repository Maintenance"));
+        maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 
1, 1, TimeUnit.MINUTES);
+        maintenanceExecutor.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30, 30, TimeUnit.SECONDS);
+
+        cachedQueries.add(new LatestEventsQuery());
+        cachedQueries.add(new LatestEventsPerProcessorQuery());
+
+        final Optional<Integer> warmCacheMinutesOption = 
config.getWarmCacheFrequencyMinutes();
+        if (warmCacheMinutesOption.isPresent() && warmCacheMinutesOption.get() 
> 0) {
+            for (final File storageDir : 
config.getStorageDirectories().values()) {
+                final int minutes = warmCacheMinutesOption.get();
+                cacheWarmerExecutor.scheduleWithFixedDelay(new 
LuceneCacheWarmer(storageDir, indexManager), 1, minutes, TimeUnit.MINUTES);
+            }
+        }
+    }
+
+    @Override
+    public long getMinimumEventIdToReindex(final String partitionName) {
+        return Math.max(0, getMaxEventId(partitionName) - 
EventIndexTask.MAX_DOCUMENTS_PER_THREAD * LuceneEventIndex.MAX_INDEX_THREADS);
+    }
+
+    protected IndexDirectoryManager getDirectoryManager() {
+        return directoryManager;
+    }
+
+    @Override
+    public void close() throws IOException {
+        closed = true;
+        queryExecutor.shutdownNow();
+        indexExecutor.shutdown();
+        cacheWarmerExecutor.shutdown();
+
+        if (maintenanceExecutor != null) {
+            maintenanceExecutor.shutdown();
+        }
+
+        for (final EventIndexTask task : indexTasks) {
+            task.shutdown();
+        }
+    }
+
+    long getMaxEventId(final String partitionName) {
+        final List<File> allDirectories = 
getDirectoryManager().getDirectories(0L, Long.MAX_VALUE, partitionName);
+        if (allDirectories.isEmpty()) {
+            return -1L;
+        }
+
+        Collections.sort(allDirectories, DirectoryUtils.NEWEST_INDEX_FIRST);
+
+        for (final File directory : allDirectories) {
+            final EventIndexSearcher searcher;
+            try {
+                searcher = indexManager.borrowIndexSearcher(directory);
+            } catch (final IOException ioe) {
+                logger.warn("Unable to read from Index Directory {}. Will 
assume that the index is incomplete and not consider this index when 
determining max event ID", directory);
+                continue;
+            }
+
+            try {
+                final IndexReader reader = 
searcher.getIndexSearcher().getIndexReader();
+                final int maxDocId = reader.maxDoc() - 1;
+                final Document document = reader.document(maxDocId);
+                final long eventId = 
document.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                logger.info("Determined that Max Event ID indexed for 
Partition {} is approximately {} based on index {}", partitionName, eventId, 
directory);
+                return eventId;
+            } catch (final IOException ioe) {
+                logger.warn("Unable to search Index Directory {}. Will assume 
that the index is incomplete and not consider this index when determining max 
event ID", directory, ioe);
+            } finally {
+                indexManager.returnIndexSearcher(searcher);
+            }
+        }
+
+        return -1L;
+    }
+
+    @Override
+    public void reindexEvents(final Map<ProvenanceEventRecord, StorageSummary> 
events) {
+        final EventIndexTask indexTask = new EventIndexTask(documentQueue, 
config, indexManager, directoryManager, 
EventIndexTask.DEFAULT_MAX_EVENTS_PER_COMMIT, eventReporter);
+
+        File lastIndexDir = null;
+        long lastEventTime = -2L;
+
+        final List<IndexableDocument> indexableDocs = new 
ArrayList<>(events.size());
+        for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : 
events.entrySet()) {
+            final ProvenanceEventRecord event = entry.getKey();
+            final StorageSummary summary = entry.getValue();
+
+            for (final CachedQuery cachedQuery : cachedQueries) {
+                cachedQuery.update(event, summary);
+            }
+
+            final Document document = eventConverter.convert(event, summary);
+            if (document == null) {
+                logger.debug("Received Provenance Event {} to index but it 
contained no information that should be indexed, so skipping it", event);
+            } else {
+                final File indexDir;
+                if (event.getEventTime() == lastEventTime) {
+                    indexDir = lastIndexDir;
+                } else {
+                    final List<File> files = 
getDirectoryManager().getDirectories(event.getEventTime(), null);
+                    indexDir = files.isEmpty() ? null : files.get(0);
+                    lastIndexDir = indexDir;
+                }
+
+                final IndexableDocument doc = new IndexableDocument(document, 
summary, indexDir);
+                indexableDocs.add(doc);
+            }
+        }
+
+        try {
+            indexTask.reIndex(indexableDocs, CommitPreference.PREVENT_COMMIT);
+        } catch (final IOException ioe) {
+            logger.error("Failed to reindex some Provenance Events", ioe);
+            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed 
to re-index some Provenance Events. "
+                + "Some Provenance Events may not be available for querying. 
See logs for more information.");
+        }
+    }
+
+    @Override
+    public void commitChanges(final String partitionName) throws IOException {
+        final Optional<File> indexDir = 
directoryManager.getActiveIndexDirectory(partitionName);
+        if (indexDir.isPresent()) {
+            final EventIndexWriter eventIndexWriter = 
indexManager.borrowIndexWriter(indexDir.get());
+            try {
+                eventIndexWriter.commit();
+            } finally {
+                indexManager.returnIndexWriter(eventIndexWriter, false, false);
+            }
+        }
+    }
+
+    protected void addEvent(final ProvenanceEventRecord event, final 
StorageSummary location) {
+        for (final CachedQuery cachedQuery : cachedQueries) {
+            cachedQuery.update(event, location);
+        }
+
+        final Document document = eventConverter.convert(event, location);
+        if (document == null) {
+            logger.debug("Received Provenance Event {} to index but it 
contained no information that should be indexed, so skipping it", event);
+        } else {
+            final StoredDocument doc = new StoredDocument(document, location);
+            boolean added = false;
+            while (!added && !closed) {
+
+                added = documentQueue.offer(doc);
+                if (!added) {
+                    final long start = System.nanoTime();
+                    try {
+                        added = documentQueue.offer(doc, 1, TimeUnit.SECONDS);
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        logger.warn("Interrupted while attempting to enqueue 
Provenance Event for indexing; this event will not be indexed");
+                        return;
+                    }
+                    final long nanos = System.nanoTime() - start;
+                    queuePauseNanos.add(new TimestampedLong(nanos));
+                }
+
+                if (added) {
+                    final long totalEventCount = eventCount.incrementAndGet();
+                    if (totalEventCount % 1_000_000 == 0 && 
logger.isDebugEnabled()) {
+                        incrementAndReportStats();
+                    }
+                }
+            }
+        }
+    }
+
+    private void incrementAndReportStats() {
+        final long fiveMinutesAgo = System.currentTimeMillis() - 
TimeUnit.MINUTES.toMillis(5);
+        final TimestampedLong nanosLastFive = 
queuePauseNanos.getAggregateValue(fiveMinutesAgo);
+        if (nanosLastFive == null) {
+            return;
+        }
+
+        final TimestampedLong eventsLast5 = 
eventsIndexed.getAggregateValue(fiveMinutesAgo);
+        if (eventsLast5 == null) {
+            return;
+        }
+
+        final long numEventsLast5 = eventsLast5.getValue();
+
+        final long millis = 
TimeUnit.NANOSECONDS.toMillis(nanosLastFive.getValue());
+        logger.debug("In the last 5 minutes, have spent {} CPU-millis waiting 
to enqueue events for indexing and have indexed {} events ({} since NiFi 
started)",
+            millis, numEventsLast5, eventCount.get());
+    }
+
+    @Override
+    public void addEvents(final Map<ProvenanceEventRecord, StorageSummary> 
events) {
+        eventsIndexed.add(new TimestampedLong((long) events.size()));
+
+        for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : 
events.entrySet()) {
+            addEvent(entry.getKey(), entry.getValue());
+        }
+    }
+
+
+    @Override
+    public ComputeLineageSubmission submitLineageComputation(final long 
eventId, final NiFiUser user, final EventAuthorizer eventAuthorizer) {
+        final Optional<ProvenanceEventRecord> eventOption;
+        try {
+            eventOption = eventStore.getEvent(eventId);
+        } catch (final Exception e) {
+            logger.error("Failed to retrieve Provenance Event with ID " + 
eventId + " to calculate data lineage due to: " + e, e);
+            final AsyncLineageSubmission result = new 
AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, 
Collections.<String> emptySet(), 1, user.getIdentity());
+            result.getResult().setError("Failed to retrieve Provenance Event 
with ID " + eventId + ". See logs for more information.");
+            return result;
+        }
+
+        if (!eventOption.isPresent()) {
+            final AsyncLineageSubmission result = new 
AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, 
Collections.<String> emptySet(), 1, user.getIdentity());
+            result.getResult().setError("Could not find Provenance Event with 
ID " + eventId);
+            lineageSubmissionMap.put(result.getLineageIdentifier(), result);
+            return result;
+        }
+
+        final ProvenanceEventRecord event = eventOption.get();
+        return 
submitLineageComputation(Collections.singleton(event.getFlowFileUuid()), user, 
eventAuthorizer, LineageComputationType.FLOWFILE_LINEAGE,
+            eventId, event.getLineageStartDate(), Long.MAX_VALUE);
+    }
+
+
+    private ComputeLineageSubmission submitLineageComputation(final 
Collection<String> flowFileUuids, final NiFiUser user, final EventAuthorizer 
eventAuthorizer,
+        final LineageComputationType computationType, final Long eventId, 
final long startTimestamp, final long endTimestamp) {
+
+        final List<File> indexDirs = 
directoryManager.getDirectories(startTimestamp, endTimestamp);
+        final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(computationType, eventId, flowFileUuids, 
indexDirs.size(), user.getIdentity());
+        lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
+
+        final BooleanQuery lineageQuery = buildLineageQuery(flowFileUuids);
+        final List<File> indexDirectories = 
directoryManager.getDirectories(startTimestamp, endTimestamp);
+        if (indexDirectories.isEmpty()) {
+            submission.getResult().update(Collections.emptyList(), 0L);
+        } else {
+            Collections.sort(indexDirectories, 
DirectoryUtils.OLDEST_INDEX_FIRST);
+
+            for (final File indexDir : indexDirectories) {
+                queryExecutor.submit(new QueryTask(lineageQuery, 
submission.getResult(), MAX_LINEAGE_NODES, indexManager, indexDir,
+                    eventStore, eventAuthorizer, 
EventTransformer.PLACEHOLDER_TRANSFORMER));
+            }
+        }
+
+        // Some computations will complete very quickly. In this case, we 
don't want to wait
+        // for the client to submit a second query to obtain the result. 
Instead, we want to just
+        // wait some short period of time for the computation to complete 
before returning the submission.
+        try {
+            submission.getResult().awaitCompletion(500, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+
+        return submission;
+    }
+
+    private BooleanQuery buildLineageQuery(final Collection<String> 
flowFileUuids) {
+        // Create a query for all Events related to the FlowFiles of interest. 
We do this by adding all ID's as
+        // "SHOULD" clauses and then setting the minimum required to 1.
+        final BooleanQuery lineageQuery;
+        if (flowFileUuids == null || flowFileUuids.isEmpty()) {
+            lineageQuery = null;
+        } else {
+            lineageQuery = new BooleanQuery();
+            for (final String flowFileUuid : flowFileUuids) {
+                lineageQuery.add(new TermQuery(new 
Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), 
Occur.SHOULD);
+            }
+            lineageQuery.setMinimumNumberShouldMatch(1);
+        }
+
+        return lineageQuery;
+    }
+
+    @Override
+    public QuerySubmission submitQuery(final Query query, final 
EventAuthorizer authorizer, final String userId) {
+        validate(query);
+
+        // Check if we have any cached queries first that can give us the 
answer
+        for (final CachedQuery cachedQuery : cachedQueries) {
+            final Optional<List<Long>> eventIdListOption = 
cachedQuery.evaluate(query);
+            if (eventIdListOption.isPresent()) {
+                final AsyncQuerySubmission submission = new 
AsyncQuerySubmission(query, 1, userId);
+                querySubmissionMap.put(query.getIdentifier(), submission);
+
+                final List<Long> eventIds = eventIdListOption.get();
+
+                queryExecutor.submit(() -> {
+                    List<ProvenanceEventRecord> events;
+                    try {
+                        events = eventStore.getEvents(eventIds, authorizer, 
EventTransformer.EMPTY_TRANSFORMER);
+                        submission.getResult().update(events, eventIds.size());
+                    } catch (final Exception e) {
+                        submission.getResult().setError("Failed to retrieve 
Provenance Events from store; see logs for more details");
+                        logger.error("Failed to retrieve Provenance Events 
from store", e);
+                    }
+                });
+
+                // There are some queries that are optimized and will complete 
very quickly. As a result,
+                // we don't want to wait for the client to issue a second 
request, so we will give the query
+                // up to 500 milliseconds to complete before running.
+                try {
+                    submission.getResult().awaitCompletion(500, 
TimeUnit.MILLISECONDS);
+                } catch (final InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+
+                return submission;
+            }
+        }
+
+        final List<File> indexDirectories = directoryManager.getDirectories(
+            query.getStartDate() == null ? null : 
query.getStartDate().getTime(),
+            query.getEndDate() == null ? null : query.getEndDate().getTime());
+
+        final AsyncQuerySubmission submission = new 
AsyncQuerySubmission(query, indexDirectories.size(), userId);
+        querySubmissionMap.put(query.getIdentifier(), submission);
+
+        final org.apache.lucene.search.Query luceneQuery = 
LuceneUtil.convertQuery(query);
+        logger.debug("Submitting query {} with identifier {} against index 
directories {}", luceneQuery, query.getIdentifier(), indexDirectories);
+
+        if (indexDirectories.isEmpty()) {
+            submission.getResult().update(Collections.emptyList(), 0L);
+        } else {
+            Collections.sort(indexDirectories, 
DirectoryUtils.NEWEST_INDEX_FIRST);
+
+            for (final File indexDir : indexDirectories) {
+                queryExecutor.submit(new QueryTask(luceneQuery, 
submission.getResult(), query.getMaxResults(), indexManager, indexDir,
+                    eventStore, authorizer, 
EventTransformer.EMPTY_TRANSFORMER));
+            }
+        }
+
+        // There are some queries that are optimized and will complete very 
quickly. As a result,
+        // we don't want to wait for the client to issue a second request, so 
we will give the query
+        // up to 500 milliseconds to complete before running.
+        try {
+            submission.getResult().awaitCompletion(500, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        return submission;
+    }
+
+
+    @Override
+    public ComputeLineageSubmission submitLineageComputation(final String 
flowFileUuid, final NiFiUser user, final EventAuthorizer eventAuthorizer) {
+        return submitLineageComputation(Collections.singleton(flowFileUuid), 
user, eventAuthorizer, LineageComputationType.FLOWFILE_LINEAGE, null, 0L, 
Long.MAX_VALUE);
+    }
+
+    @Override
+    public ComputeLineageSubmission submitExpandChildren(final long eventId, 
final NiFiUser user, final EventAuthorizer authorizer) {
+        final String userId = user.getIdentity();
+
+        try {
+            final Optional<ProvenanceEventRecord> eventOption = 
eventStore.getEvent(eventId);
+            if (!eventOption.isPresent()) {
+                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, 
Collections.emptyList(), 1, userId);
+                lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
+                submission.getResult().update(Collections.emptyList(), 0L);
+                return submission;
+            }
+
+            final ProvenanceEventRecord event = eventOption.get();
+            switch (event.getEventType()) {
+                case CLONE:
+                case FORK:
+                case JOIN:
+                case REPLAY: {
+                    return submitLineageComputation(event.getChildUuids(), 
user, authorizer, LineageComputationType.EXPAND_CHILDREN,
+                        eventId, event.getEventTime(), Long.MAX_VALUE);
+                }
+                default: {
+                    final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN,
+                        eventId, Collections.<String> emptyList(), 1, userId);
+
+                    
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
+                    submission.getResult().setError("Event ID " + eventId + " 
indicates an event of type " + event.getEventType() + " so its children cannot 
be expanded");
+                    return submission;
+                }
+            }
+        } catch (final Exception e) {
+            final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN,
+                eventId, Collections.<String> emptyList(), 1, userId);
+            lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
+            submission.getResult().setError("Failed to expand children for 
lineage of event with ID " + eventId + " due to: " + e);
+            return submission;
+        }
+    }
+
+    @Override
+    public ComputeLineageSubmission submitExpandParents(final long eventId, 
final NiFiUser user, final EventAuthorizer authorizer) {
+        final String userId = user.getIdentity();
+
+        try {
+            final Optional<ProvenanceEventRecord> eventOption = 
eventStore.getEvent(eventId);
+            if (!eventOption.isPresent()) {
+                final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, 
Collections.emptyList(), 1, userId);
+                lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
+                submission.getResult().update(Collections.emptyList(), 0L);
+                return submission;
+            }
+
+            final ProvenanceEventRecord event = eventOption.get();
+            switch (event.getEventType()) {
+                case JOIN:
+                case FORK:
+                case CLONE:
+                case REPLAY: {
+                    return submitLineageComputation(event.getParentUuids(), 
user, authorizer, LineageComputationType.EXPAND_PARENTS,
+                        eventId, event.getLineageStartDate(), 
event.getEventTime());
+                }
+                default: {
+                    final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS,
+                        eventId, Collections.<String> emptyList(), 1, userId);
+
+                    
lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
+                    submission.getResult().setError("Event ID " + eventId + " 
indicates an event of type " + event.getEventType() + " so its parents cannot 
be expanded");
+                    return submission;
+                }
+            }
+        } catch (final Exception e) {
+            final AsyncLineageSubmission submission = new 
AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS,
+                eventId, Collections.<String> emptyList(), 1, userId);
+            lineageSubmissionMap.put(submission.getLineageIdentifier(), 
submission);
+
+            submission.getResult().setError("Failed to expand parents for 
lineage of event with ID " + eventId + " due to: " + e);
+            return submission;
+        }
+    }
+
+    @Override
+    public AsyncLineageSubmission retrieveLineageSubmission(final String 
lineageIdentifier, final NiFiUser user) {
+        final AsyncLineageSubmission submission = 
lineageSubmissionMap.get(lineageIdentifier);
+        final String userId = submission.getSubmitterIdentity();
+
+        if (user == null && userId == null) {
+            return submission;
+        }
+
+        if (user == null) {
+            throw new AccessDeniedException("Cannot retrieve Provenance 
Lineage Submission because no user id was provided");
+        }
+
+        if (userId == null || userId.equals(user.getIdentity())) {
+            return submission;
+        }
+
+        throw new AccessDeniedException("Cannot retrieve Provenance Lineage 
Submission because " + user.getIdentity() + " is not the user who submitted the 
request");
+    }
+
+    @Override
+    public QuerySubmission retrieveQuerySubmission(final String 
queryIdentifier, final NiFiUser user) {
+        final QuerySubmission submission = 
querySubmissionMap.get(queryIdentifier);
+
+        final String userId = submission.getSubmitterIdentity();
+
+        if (user == null && userId == null) {
+            return submission;
+        }
+
+        if (user == null) {
+            throw new AccessDeniedException("Cannot retrieve Provenance Query 
Submission because no user id was provided");
+        }
+
+        if (userId == null || userId.equals(user.getIdentity())) {
+            return submission;
+        }
+
+        throw new AccessDeniedException("Cannot retrieve Provenance Query 
Submission because " + user.getIdentity() + " is not the user who submitted the 
request");
+    }
+
+    @Override
+    public long getSize() {
+        long total = 0;
+        for (final File file : directoryManager.getDirectories(null, null)) {
+            total += DirectoryUtils.getSize(file);
+        }
+        return total;
+    }
+
+    private void validate(final Query query) {
+        final int numQueries = querySubmissionMap.size();
+        if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
+            throw new IllegalStateException("Cannot process query because 
there are currently " + numQueries + " queries whose results have not "
+                + "been deleted due to poorly behaving clients not issuing 
DELETE requests. Please try again later.");
+        }
+
+        if (query.getEndDate() != null && query.getStartDate() != null && 
query.getStartDate().getTime() > query.getEndDate().getTime()) {
+            throw new IllegalArgumentException("Query End Time cannot be 
before Query Start Time");
+        }
+    }
+
+    void performMaintenance() {
+        try {
+            final List<ProvenanceEventRecord> firstEvents = 
eventStore.getEvents(0, 1);
+            if (firstEvents.isEmpty()) {
+                return;
+            }
+
+            final ProvenanceEventRecord firstEvent = firstEvents.get(0);
+            final long earliestEventTime = firstEvent.getEventTime();
+            logger.debug("First Event Time is {} ({}) with Event ID {}; will 
delete any Lucene Index that is older than this",
+                earliestEventTime, new Date(earliestEventTime), 
firstEvent.getEventId());
+            final List<File> indicesBeforeEarliestEvent = 
directoryManager.getDirectoriesBefore(earliestEventTime);
+
+            for (final File index : indicesBeforeEarliestEvent) {
+                logger.debug("Index directory {} is now expired. Attempting to 
remove index", index);
+                tryDeleteIndex(index);
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to perform background maintenance 
procedures", e);
+            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed 
to perform maintenance of Provenance Repository. See logs for more 
information.");
+        }
+    }
+
+    protected boolean tryDeleteIndex(final File indexDirectory) {
+        final long startNanos = System.nanoTime();
+        boolean removed = false;
+        while (!removed && System.nanoTime() - startNanos < 
TimeUnit.SECONDS.toNanos(MAX_DELETE_INDEX_WAIT_SECONDS)) {
+            removed = indexManager.removeIndex(indexDirectory);
+
+            if (!removed) {
+                try {
+                    Thread.sleep(5000L);
+                } catch (final InterruptedException ie) {
+                    logger.debug("Interrupted when trying to remove index {} 
from IndexManager; will not remove index", indexDirectory);
+                    Thread.currentThread().interrupt();
+                    return false;
+                }
+            }
+        }
+
+        if (removed) {
+            try {
+                FileUtils.deleteFile(indexDirectory, true);
+                logger.debug("Successfully deleted directory {}", 
indexDirectory);
+            } catch (final IOException e) {
+                logger.warn("The Lucene Index located at " + indexDirectory + 
" has expired and contains no Provenance Events that still exist in the 
respository. "
+                    + "However, the directory could not be deleted.", e);
+            }
+
+            directoryManager.deleteDirectory(indexDirectory);
+            logger.info("Successfully removed expired Lucene Index {}", 
indexDirectory);
+        } else {
+            logger.warn("The Lucene Index located at {} has expired and 
contains no Provenance Events that still exist in the respository. "
+                + "However, the directory could not be deleted because it is 
still actively being used. Will continue to try to delete "
+                + "in a subsequent maintenance cycle.", indexDirectory);
+        }
+
+        return removed;
+    }
+
+    private class RemoveExpiredQueryResults implements Runnable {
+        @Override
+        public void run() {
+            try {
+                final Date now = new Date();
+
+                final Iterator<Map.Entry<String, AsyncQuerySubmission>> 
queryIterator = querySubmissionMap.entrySet().iterator();
+                while (queryIterator.hasNext()) {
+                    final Map.Entry<String, AsyncQuerySubmission> entry = 
queryIterator.next();
+
+                    final StandardQueryResult result = 
entry.getValue().getResult();
+                    if (entry.getValue().isCanceled() || result.isFinished() 
&& result.getExpiration().before(now)) {
+                        queryIterator.remove();
+                    }
+                }
+
+                final Iterator<Map.Entry<String, AsyncLineageSubmission>> 
lineageIterator = lineageSubmissionMap.entrySet().iterator();
+                while (lineageIterator.hasNext()) {
+                    final Map.Entry<String, AsyncLineageSubmission> entry = 
lineageIterator.next();
+
+                    final StandardLineageResult result = 
entry.getValue().getResult();
+                    if (entry.getValue().isCanceled() || result.isFinished() 
&& result.getExpiration().before(now)) {
+                        lineageIterator.remove();
+                    }
+                }
+            } catch (final Exception e) {
+                logger.error("Failed to expire Provenance Query Results due to 
{}", e.toString());
+                logger.error("", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java
new file mode 100644
index 0000000..38d3f61
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.nifi.provenance.ProgressiveResult;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.authorization.EventTransformer;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.index.SearchFailedException;
+import org.apache.nifi.provenance.lucene.IndexManager;
+import org.apache.nifi.provenance.store.EventStore;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryTask implements Runnable {
+    private static final Logger logger = 
LoggerFactory.getLogger(QueryTask.class);
+    private static final Set<String> LUCENE_FIELDS_TO_LOAD = 
Collections.singleton(SearchableFields.Identifier.getSearchableFieldName());
+
+    private final Query query;
+    private final ProgressiveResult queryResult;
+    private final int maxResults;
+    private final IndexManager indexManager;
+    private final File indexDir;
+    private final EventStore eventStore;
+    private final EventAuthorizer authorizer;
+    private final EventTransformer transformer;
+
+    public QueryTask(final Query query, final ProgressiveResult result, final 
int maxResults, final IndexManager indexManager,
+        final File indexDir, final EventStore eventStore, final 
EventAuthorizer authorizer,
+        final EventTransformer unauthorizedTransformer) {
+        this.query = query;
+        this.queryResult = result;
+        this.maxResults = maxResults;
+        this.indexManager = indexManager;
+        this.indexDir = indexDir;
+        this.eventStore = eventStore;
+        this.authorizer = authorizer;
+        this.transformer = unauthorizedTransformer;
+    }
+
+    @Override
+    public void run() {
+        if (queryResult.getTotalHitCount() >= maxResults) {
+            logger.debug("Will not query lucene index {} because maximum 
results have already been obtained", indexDir);
+            queryResult.update(Collections.emptyList(), 0L);
+            return;
+        }
+
+        if (queryResult.isFinished()) {
+            logger.debug("Will not query lucene index {} because the query is 
already finished", indexDir);
+            return;
+        }
+
+
+        final long borrowStart = System.nanoTime();
+        final EventIndexSearcher searcher;
+        try {
+            searcher = indexManager.borrowIndexSearcher(indexDir);
+        } catch (final FileNotFoundException fnfe) {
+            // We do not consider this an error because it may well just be 
the case that the event index has aged off and
+            // been deleted or that we've just created the index and haven't 
yet committed the writer. So instead, we just
+            // update the result ot indicate that this index search is 
complete with no results.
+            queryResult.update(Collections.emptyList(), 0);
+
+            // nothing has been indexed yet, or the data has already aged off
+            logger.info("Attempted to search Provenance Index {} but could not 
find the directory or the directory did not contain a valid Lucene index. "
+                + "This usually indicates that either the index was just 
created and hasn't fully been initialized, or that the index was recently aged 
off.", indexDir);
+            return;
+        } catch (final IOException ioe) {
+            queryResult.setError("Failed to query index " + indexDir + "; see 
logs for more details");
+            logger.error("Failed to query index " + indexDir, ioe);
+            return;
+        }
+
+        try {
+            final long borrowMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - borrowStart);
+            logger.debug("Borrowing index searcher for {} took {} ms", 
indexDir, borrowMillis);
+            final long startNanos = System.nanoTime();
+
+            // If max number of results are retrieved, do not bother querying 
lucene
+            if (queryResult.getTotalHitCount() >= maxResults) {
+                logger.debug("Will not query lucene index {} because maximum 
results have already been obtained", indexDir);
+                queryResult.update(Collections.emptyList(), 0L);
+                return;
+            }
+
+            if (queryResult.isFinished()) {
+                logger.debug("Will not query lucene index {} because the query 
is already finished", indexDir);
+                return;
+            }
+
+            // Query lucene
+            final IndexReader indexReader = 
searcher.getIndexSearcher().getIndexReader();
+            final TopDocs topDocs;
+            try {
+                topDocs = searcher.getIndexSearcher().search(query, 
maxResults);
+            } catch (final Exception e) {
+                logger.error("Failed to query Lucene for index " + indexDir, 
e);
+                queryResult.setError("Failed to query Lucene for index " + 
indexDir + " due to " + e);
+                return;
+            } finally {
+                final long ms = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                logger.debug("Querying Lucene for index {} took {} ms", 
indexDir, ms);
+            }
+
+            // If max number of results are retrieved, do not bother reading 
docs
+            if (queryResult.getTotalHitCount() >= maxResults) {
+                logger.debug("Will not read events from store for {} because 
maximum results have already been obtained", indexDir);
+                queryResult.update(Collections.emptyList(), 0L);
+                return;
+            }
+
+            if (queryResult.isFinished()) {
+                logger.debug("Will not read events from store for {} because 
the query has already finished", indexDir);
+                return;
+            }
+
+            final Tuple<List<ProvenanceEventRecord>, Integer> 
eventsAndTotalHits = readDocuments(topDocs, indexReader);
+
+            if (eventsAndTotalHits == null) {
+                queryResult.update(Collections.emptyList(), 0L);
+                logger.info("Will not update query results for queried index 
{} for query {} because the maximum number of results have been reached 
already",
+                    indexDir, query);
+            } else {
+                queryResult.update(eventsAndTotalHits.getKey(), 
eventsAndTotalHits.getValue());
+
+                final long searchNanos = System.nanoTime() - startNanos;
+                final long millis = TimeUnit.NANOSECONDS.toMillis(searchNanos);
+                logger.info("Successfully queried index {} for query {}; 
retrieved {} events with a total of {} hits in {} millis",
+                    indexDir, query, eventsAndTotalHits.getKey().size(), 
eventsAndTotalHits.getValue(), millis);
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to query events against index " + indexDir, 
e);
+            queryResult.setError("Failed to complete query due to " + e);
+        } finally {
+            indexManager.returnIndexSearcher(searcher);
+        }
+    }
+
+    private Tuple<List<ProvenanceEventRecord>, Integer> readDocuments(final 
TopDocs topDocs, final IndexReader indexReader) {
+        // If no topDocs is supplied, just provide a Tuple that has no records 
and a hit count of 0.
+        if (topDocs == null || topDocs.totalHits == 0) {
+            return new Tuple<>(Collections.<ProvenanceEventRecord> 
emptyList(), 0);
+        }
+
+        final long start = System.nanoTime();
+        final List<Long> eventIds = Arrays.stream(topDocs.scoreDocs)
+            .mapToInt(scoreDoc -> scoreDoc.doc)
+            .mapToObj(docId -> {
+                try {
+                    return indexReader.document(docId, LUCENE_FIELDS_TO_LOAD);
+                } catch (final Exception e) {
+                    throw new SearchFailedException("Failed to read Provenance 
Events from Event File", e);
+                }
+            })
+            .map(doc -> 
doc.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue())
+            .collect(Collectors.toList());
+
+        final long endConvert = System.nanoTime();
+        final long ms = TimeUnit.NANOSECONDS.toMillis(endConvert - start);
+        logger.debug("Converting documents took {} ms", ms);
+
+        List<ProvenanceEventRecord> events;
+        try {
+            events = eventStore.getEvents(eventIds, authorizer, transformer);
+        } catch (IOException e) {
+            throw new SearchFailedException("Unable to retrieve events from 
the Provenance Store", e);
+        }
+
+        final long fetchEventNanos = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - endConvert);
+        logger.debug("Fetching {} events from Event Store took {} ms ({} 
events actually fetched)", eventIds.size(), fetchEventNanos, events.size());
+
+        final int totalHits = topDocs.totalHits;
+        return new Tuple<>(events, totalHits);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/StoredDocument.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/StoredDocument.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/StoredDocument.java
new file mode 100644
index 0000000..207ba9f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/StoredDocument.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import org.apache.lucene.document.Document;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+
+public class StoredDocument {
+    private final Document document;
+    private final StorageSummary storageSummary;
+
+    public StoredDocument(final Document document, final StorageSummary 
summary) {
+        this.document = document;
+        this.storageSummary = summary;
+    }
+
+    public Document getDocument() {
+        return document;
+    }
+
+    public StorageSummary getStorageSummary() {
+        return storageSummary;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
index ddfa0db..eefcecb 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
@@ -36,6 +36,8 @@ import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.index.EventIndexWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,53 +49,61 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
     private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new 
HashMap<>();
 
 
-    public void removeIndex(final File indexDirectory) {
+    @Override
+    public boolean removeIndex(final File indexDirectory) {
         final File absoluteFile = indexDirectory.getAbsoluteFile();
         logger.info("Removing index {}", indexDirectory);
 
         lock.lock();
         try {
             final IndexWriterCount count = writerCounts.remove(absoluteFile);
-            if ( count != null ) {
+            if (count != null) {
                 try {
                     count.close();
                 } catch (final IOException ioe) {
                     logger.warn("Failed to close Index Writer {} for {}", 
count.getWriter(), absoluteFile);
-                    if ( logger.isDebugEnabled() ) {
+                    if (logger.isDebugEnabled()) {
                         logger.warn("", ioe);
                     }
+
+                    return false;
                 }
             }
 
             final List<ActiveIndexSearcher> searcherList = 
activeSearchers.remove(absoluteFile);
             if (searcherList != null) {
-                for ( final ActiveIndexSearcher searcher : searcherList ) {
+                for (final ActiveIndexSearcher searcher : searcherList) {
                     try {
                         searcher.close();
                     } catch (final IOException ioe) {
                         logger.warn("Failed to close Index Searcher {} for {} 
due to {}",
-                                searcher.getSearcher(), absoluteFile, ioe);
-                        if ( logger.isDebugEnabled() ) {
+                            searcher.getSearcher(), absoluteFile, ioe);
+                        if (logger.isDebugEnabled()) {
                             logger.warn("", ioe);
                         }
+
+                        return false;
                     }
                 }
             }
         } finally {
             lock.unlock();
         }
+
+        return true;
     }
 
-    public IndexWriter borrowIndexWriter(final File indexingDirectory) throws 
IOException {
-        final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.trace("Borrowing index writer for {}", indexingDirectory);
+    @Override
+    public EventIndexWriter borrowIndexWriter(final File indexDirectory) 
throws IOException {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.trace("Borrowing index writer for {}", indexDirectory);
 
         lock.lock();
         try {
             IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-            if ( writerCount == null ) {
+            if (writerCount == null) {
                 final List<Closeable> closeables = new ArrayList<>();
-                final Directory directory = 
FSDirectory.open(indexingDirectory);
+                final Directory directory = FSDirectory.open(indexDirectory);
                 closeables.add(directory);
 
                 try {
@@ -104,10 +114,11 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
                     config.setWriteLockTimeout(300000L);
 
                     final IndexWriter indexWriter = new IndexWriter(directory, 
config);
-                    writerCount = new IndexWriterCount(indexWriter, analyzer, 
directory, 1);
-                    logger.debug("Providing new index writer for {}", 
indexingDirectory);
+                    final EventIndexWriter eventIndexWriter = new 
LuceneEventIndexWriter(indexWriter, indexDirectory);
+                    writerCount = new IndexWriterCount(eventIndexWriter, 
analyzer, directory, 1);
+                    logger.debug("Providing new index writer for {}", 
indexDirectory);
                 } catch (final IOException ioe) {
-                    for ( final Closeable closeable : closeables ) {
+                    for (final Closeable closeable : closeables) {
                         try {
                             closeable.close();
                         } catch (final IOException ioe2) {
@@ -122,16 +133,16 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
 
                 // Mark any active searchers as poisoned because we are 
updating the index
                 final List<ActiveIndexSearcher> searchers = 
activeSearchers.get(absoluteFile);
-                if ( searchers != null ) {
+                if (searchers != null) {
                     for (final ActiveIndexSearcher activeSearcher : searchers) 
{
-                        logger.debug("Poisoning {} because it is searching {}, 
which is getting updated", activeSearcher, indexingDirectory);
+                        logger.debug("Poisoning {} because it is searching {}, 
which is getting updated", activeSearcher, indexDirectory);
                         activeSearcher.poison();
                     }
                 }
             } else {
-                logger.debug("Providing existing index writer for {} and 
incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+                logger.debug("Providing existing index writer for {} and 
incrementing count to {}", indexDirectory, writerCount.getCount() + 1);
                 writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(),
-                        writerCount.getAnalyzer(), writerCount.getDirectory(), 
writerCount.getCount() + 1));
+                    writerCount.getAnalyzer(), writerCount.getDirectory(), 
writerCount.getCount() + 1));
             }
 
             return writerCount.getWriter();
@@ -140,31 +151,53 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
         }
     }
 
-    public void returnIndexWriter(final File indexingDirectory, final 
IndexWriter writer) {
-        final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.trace("Returning Index Writer for {} to IndexManager", 
indexingDirectory);
+
+    @Override
+    public void returnIndexWriter(final EventIndexWriter writer) {
+        returnIndexWriter(writer, true, true);
+    }
+
+    @Override
+    public void returnIndexWriter(final EventIndexWriter writer, final boolean 
commit, final boolean isCloseable) {
+        final File indexDirectory = writer.getDirectory();
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.trace("Returning Index Writer for {} to IndexManager", 
indexDirectory);
 
         lock.lock();
         try {
-            final IndexWriterCount count = writerCounts.remove(absoluteFile);
+            final IndexWriterCount count = writerCounts.get(absoluteFile);
 
             try {
-                if ( count == null ) {
+                if (count == null) {
                     logger.warn("Index Writer {} was returned to IndexManager 
for {}, but this writer is not known. "
-                            + "This could potentially lead to a resource 
leak", writer, indexingDirectory);
+                        + "This could potentially lead to a resource leak", 
writer, indexDirectory);
                     writer.close();
-                } else if ( count.getCount() <= 1 ) {
+                } else if (count.getCount() <= 1) {
                     // we are finished with this writer.
-                    logger.debug("Decrementing count for Index Writer for {} 
to {}; Closing writer", indexingDirectory, count.getCount() - 1);
-                    count.close();
+                    logger.info("Decrementing count for Index Writer for {} to 
{}. Now finished writing to this Index Directory",
+                        indexDirectory, count.getCount() - 1);
+
+                    try {
+                        if (commit) {
+                            writer.commit();
+                        }
+                    } finally {
+                        if (isCloseable) {
+                            try {
+                                count.close();
+                            } finally {
+                                writerCounts.remove(absoluteFile);
+                            }
+                        }
+                    }
                 } else {
                     // decrement the count.
-                    logger.debug("Decrementing count for Index Writer for {} 
to {}", indexingDirectory, count.getCount() - 1);
+                    logger.debug("Decrementing count for Index Writer for {} 
to {}", indexDirectory, count.getCount() - 1);
                     writerCounts.put(absoluteFile, new 
IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), 
count.getCount() - 1));
                 }
             } catch (final IOException ioe) {
                 logger.warn("Failed to close Index Writer {} due to {}", 
writer, ioe);
-                if ( logger.isDebugEnabled() ) {
+                if (logger.isDebugEnabled()) {
                     logger.warn("", ioe);
                 }
             }
@@ -174,7 +207,8 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
     }
 
 
-    public IndexSearcher borrowIndexSearcher(final File indexDir) throws 
IOException {
+    @Override
+    public EventIndexSearcher borrowIndexSearcher(final File indexDir) throws 
IOException {
         final File absoluteFile = indexDir.getAbsoluteFile();
         logger.trace("Borrowing index searcher for {}", indexDir);
 
@@ -182,7 +216,7 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
         try {
             // check if we already have a reader cached.
             List<ActiveIndexSearcher> currentlyCached = 
activeSearchers.get(absoluteFile);
-            if ( currentlyCached == null ) {
+            if (currentlyCached == null) {
                 currentlyCached = new ArrayList<>();
                 activeSearchers.put(absoluteFile, currentlyCached);
             } else {
@@ -197,7 +231,7 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
 
                         // if there are no references to the reader, it will 
have been closed. Since there is no
                         // isClosed() method, this is how we determine whether 
it's been closed or not.
-                        final int refCount = 
searcher.getSearcher().getIndexReader().getRefCount();
+                        final int refCount = 
searcher.getSearcher().getIndexSearcher().getIndexReader().getRefCount();
                         if (refCount <= 0) {
                             // if refCount == 0, then the reader has been 
closed, so we cannot use the searcher
                             logger.debug("Reference count for cached Index 
Searcher for {} is currently {}; "
@@ -216,16 +250,17 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
             // if we have an Index Writer, and if so create a Reader based on 
the Index Writer.
             // This will provide us a 'near real time' index reader.
             final IndexWriterCount writerCount = 
writerCounts.remove(absoluteFile);
-            if ( writerCount == null ) {
+            if (writerCount == null) {
                 final Directory directory = FSDirectory.open(absoluteFile);
                 logger.debug("No Index Writer currently exists for {}; 
creating a cachable reader", indexDir);
 
                 try {
                     final DirectoryReader directoryReader = 
DirectoryReader.open(directory);
                     final IndexSearcher searcher = new 
IndexSearcher(directoryReader);
+                    final EventIndexSearcher eventIndexSearcher = new 
LuceneEventIndexSearcher(searcher, indexDir, directory, directoryReader);
 
                     // we want to cache the searcher that we create, since 
it's just a reader.
-                    final ActiveIndexSearcher cached = new 
ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
+                    final ActiveIndexSearcher cached = new 
ActiveIndexSearcher(eventIndexSearcher, absoluteFile, directoryReader, 
directory, true);
                     currentlyCached.add(cached);
 
                     return cached.getSearcher();
@@ -243,22 +278,23 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
                 }
             } else {
                 logger.debug("Index Writer currently exists for {}; creating a 
non-cachable reader and incrementing "
-                        + "counter to {}", indexDir, writerCount.getCount() + 
1);
+                    + "counter to {}", indexDir, writerCount.getCount() + 1);
 
                 // increment the writer count to ensure that it's kept open.
                 writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(),
-                        writerCount.getAnalyzer(), writerCount.getDirectory(), 
writerCount.getCount() + 1));
+                    writerCount.getAnalyzer(), writerCount.getDirectory(), 
writerCount.getCount() + 1));
 
                 // create a new Index Searcher from the writer so that we 
don't have an issue with trying
                 // to read from a directory that's locked. If we get the "no 
segments* file found" with
                 // Lucene, this indicates that an IndexWriter already has the 
directory open.
-                final IndexWriter writer = writerCount.getWriter();
-                final DirectoryReader directoryReader = 
DirectoryReader.open(writer, false);
+                final EventIndexWriter writer = writerCount.getWriter();
+                final DirectoryReader directoryReader = 
DirectoryReader.open(writer.getIndexWriter(), false);
                 final IndexSearcher searcher = new 
IndexSearcher(directoryReader);
+                final EventIndexSearcher eventIndexSearcher = new 
LuceneEventIndexSearcher(searcher, indexDir, null, directoryReader);
 
                 // we don't want to cache this searcher because it's based on 
a writer, so we want to get
                 // new values the next time that we search.
-                final ActiveIndexSearcher activeSearcher = new 
ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
+                final ActiveIndexSearcher activeSearcher = new 
ActiveIndexSearcher(eventIndexSearcher, absoluteFile, directoryReader, null, 
false);
 
                 currentlyCached.add(activeSearcher);
                 return activeSearcher.getSearcher();
@@ -269,7 +305,9 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
     }
 
 
-    public void returnIndexSearcher(final File indexDirectory, final 
IndexSearcher searcher) {
+    @Override
+    public void returnIndexSearcher(final EventIndexSearcher searcher) {
+        final File indexDirectory = searcher.getIndexDirectory();
         final File absoluteFile = indexDirectory.getAbsoluteFile();
         logger.trace("Returning index searcher for {} to IndexManager", 
indexDirectory);
 
@@ -277,9 +315,9 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
         try {
             // check if we already have a reader cached.
             final List<ActiveIndexSearcher> currentlyCached = 
activeSearchers.get(absoluteFile);
-            if ( currentlyCached == null ) {
+            if (currentlyCached == null) {
                 logger.warn("Received Index Searcher for {} but no searcher 
was provided for that directory; this could "
-                        + "result in a resource leak", indexDirectory);
+                    + "result in a resource leak", indexDirectory);
                 return;
             }
 
@@ -289,20 +327,20 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
             boolean activeSearcherFound = false;
             while (itr.hasNext()) {
                 final ActiveIndexSearcher activeSearcher = itr.next();
-                if ( activeSearcher.getSearcher().equals(searcher) ) {
+                if (activeSearcher.getSearcher().equals(searcher)) {
                     activeSearcherFound = true;
-                    if ( activeSearcher.isCache() ) {
+                    if (activeSearcher.isCache()) {
                         // if the searcher is poisoned, close it and remove 
from "pool". Otherwise,
                         // just decrement the count. Note here that when we 
call close() it won't actually close
                         // the underlying directory reader unless there are no 
more references to it
-                        if ( activeSearcher.isPoisoned() ) {
+                        if (activeSearcher.isPoisoned()) {
                             itr.remove();
 
                             try {
                                 activeSearcher.close();
                             } catch (final IOException ioe) {
                                 logger.warn("Failed to close Index Searcher 
for {} due to {}", absoluteFile, ioe);
-                                if ( logger.isDebugEnabled() ) {
+                                if (logger.isDebugEnabled()) {
                                     logger.warn("", ioe);
                                 }
                             }
@@ -322,26 +360,26 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
 
                         // decrement the writer count because we incremented 
it when creating the searcher
                         final IndexWriterCount writerCount = 
writerCounts.remove(absoluteFile);
-                        if ( writerCount != null ) {
-                            if ( writerCount.getCount() <= 1 ) {
+                        if (writerCount != null) {
+                            if (writerCount.getCount() <= 1) {
                                 try {
                                     logger.debug("Index searcher for {} is not 
cached. Writer count is "
-                                            + "decremented to {}; closing 
writer", indexDirectory, writerCount.getCount() - 1);
+                                        + "decremented to {}; closing writer", 
indexDirectory, writerCount.getCount() - 1);
 
                                     writerCount.close();
                                 } catch (final IOException ioe) {
                                     logger.warn("Failed to close Index Writer 
for {} due to {}", absoluteFile, ioe);
-                                    if ( logger.isDebugEnabled() ) {
+                                    if (logger.isDebugEnabled()) {
                                         logger.warn("", ioe);
                                     }
                                 }
                             } else {
                                 logger.debug("Index searcher for {} is not 
cached. Writer count is decremented "
-                                        + "to {}; leaving writer open", 
indexDirectory, writerCount.getCount() - 1);
+                                    + "to {}; leaving writer open", 
indexDirectory, writerCount.getCount() - 1);
 
                                 writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(),
-                                        writerCount.getAnalyzer(), 
writerCount.getDirectory(),
-                                        writerCount.getCount() - 1));
+                                    writerCount.getAnalyzer(), 
writerCount.getDirectory(),
+                                    writerCount.getCount() - 1));
                             }
                         }
 
@@ -353,7 +391,7 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
                             }
                         } catch (final IOException ioe) {
                             logger.warn("Failed to close Index Searcher for {} 
due to {}", absoluteFile, ioe);
-                            if ( logger.isDebugEnabled() ) {
+                            if (logger.isDebugEnabled()) {
                                 logger.warn("", ioe);
                             }
                         }
@@ -378,11 +416,11 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
         try {
             IOException ioe = null;
 
-            for ( final IndexWriterCount count : writerCounts.values() ) {
+            for (final IndexWriterCount count : writerCounts.values()) {
                 try {
                     count.close();
                 } catch (final IOException e) {
-                    if ( ioe == null ) {
+                    if (ioe == null) {
                         ioe = e;
                     } else {
                         ioe.addSuppressed(e);
@@ -395,7 +433,7 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
                     try {
                         searcher.close();
                     } catch (final IOException e) {
-                        if ( ioe == null ) {
+                        if (ioe == null) {
                             ioe = e;
                         } else {
                             ioe.addSuppressed(e);
@@ -404,7 +442,7 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
                 }
             }
 
-            if ( ioe != null ) {
+            if (ioe != null) {
                 throw ioe;
             }
         } finally {
@@ -415,15 +453,15 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
 
     private static void close(final Closeable... closeables) throws 
IOException {
         IOException ioe = null;
-        for ( final Closeable closeable : closeables ) {
-            if ( closeable == null ) {
+        for (final Closeable closeable : closeables) {
+            if (closeable == null) {
                 continue;
             }
 
             try {
                 closeable.close();
             } catch (final IOException e) {
-                if ( ioe == null ) {
+                if (ioe == null) {
                     ioe = e;
                 } else {
                     ioe.addSuppressed(e);
@@ -431,14 +469,14 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
             }
         }
 
-        if ( ioe != null ) {
+        if (ioe != null) {
             throw ioe;
         }
     }
 
 
     private static class ActiveIndexSearcher {
-        private final IndexSearcher searcher;
+        private final EventIndexSearcher searcher;
         private final DirectoryReader directoryReader;
         private final File indexDirectory;
         private final Directory directory;
@@ -446,8 +484,8 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
         private final AtomicInteger referenceCount = new AtomicInteger(1);
         private volatile boolean poisoned = false;
 
-        public ActiveIndexSearcher(final IndexSearcher searcher, final File 
indexDirectory, final DirectoryReader directoryReader,
-                final Directory directory, final boolean cache) {
+        public ActiveIndexSearcher(final EventIndexSearcher searcher, final 
File indexDirectory, final DirectoryReader directoryReader,
+            final Directory directory, final boolean cache) {
             this.searcher = searcher;
             this.directoryReader = directoryReader;
             this.indexDirectory = indexDirectory;
@@ -459,7 +497,7 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
             return cache;
         }
 
-        public IndexSearcher getSearcher() {
+        public EventIndexSearcher getSearcher() {
             return searcher;
         }
 
@@ -499,12 +537,12 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
 
 
     private static class IndexWriterCount implements Closeable {
-        private final IndexWriter writer;
+        private final EventIndexWriter writer;
         private final Analyzer analyzer;
         private final Directory directory;
         private final int count;
 
-        public IndexWriterCount(final IndexWriter writer, final Analyzer 
analyzer, final Directory directory, final int count) {
+        public IndexWriterCount(final EventIndexWriter writer, final Analyzer 
analyzer, final Directory directory, final int count) {
             this.writer = writer;
             this.analyzer = analyzer;
             this.directory = directory;
@@ -519,7 +557,7 @@ public class CachingIndexManager implements Closeable, 
IndexManager {
             return directory;
         }
 
-        public IndexWriter getWriter() {
+        public EventIndexWriter getWriter() {
             return writer;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 7707352..f372a2d 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -25,6 +25,7 @@ import org.apache.lucene.index.Term;
 import org.apache.nifi.provenance.IndexConfiguration;
 import org.apache.nifi.provenance.PersistentProvenanceRepository;
 import org.apache.nifi.provenance.expiration.ExpirationAction;
+import org.apache.nifi.provenance.index.EventIndexWriter;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.slf4j.Logger;
@@ -60,15 +61,16 @@ public class DeleteIndexAction implements ExpirationAction {
             final Term term = new Term(FieldNames.STORAGE_FILENAME, 
LuceneUtil.substringBefore(expiredFile.getName(), "."));
 
             boolean deleteDir = false;
-            final IndexWriter writer = 
indexManager.borrowIndexWriter(indexingDirectory);
+            final EventIndexWriter writer = 
indexManager.borrowIndexWriter(indexingDirectory);
             try {
-                writer.deleteDocuments(term);
-                writer.commit();
-                final int docsLeft = writer.numDocs();
+                final IndexWriter indexWriter = writer.getIndexWriter();
+                indexWriter.deleteDocuments(term);
+                indexWriter.commit();
+                final int docsLeft = indexWriter.numDocs();
                 deleteDir = docsLeft <= 0;
                 logger.debug("After expiring {}, there are {} docs left for 
index {}", expiredFile, docsLeft, indexingDirectory);
             } finally {
-                indexManager.returnIndexWriter(indexingDirectory, writer);
+                indexManager.returnIndexWriter(writer);
             }
 
             // we've confirmed that all documents have been removed. Delete 
the index directory.

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index ce62152..0e96b62 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -30,25 +30,25 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.authorization.AuthorizationCheck;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class DocsReader {
+public class DocsReader {
     private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
 
-    public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final 
AuthorizationCheck authCheck, final IndexReader indexReader, final 
Collection<Path> allProvenanceLogFiles,
+    public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final 
EventAuthorizer authorizer, final IndexReader indexReader, final 
Collection<Path> allProvenanceLogFiles,
             final AtomicInteger retrievalCount, final int maxResults, final 
int maxAttributeChars) throws IOException {
         if (retrievalCount.get() >= maxResults) {
             return Collections.emptySet();
@@ -67,7 +67,7 @@ class DocsReader {
 
         final long readDocuments = System.nanoTime() - start;
         logger.debug("Reading {} Lucene Documents took {} millis", 
docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
-        return read(docs, authCheck, allProvenanceLogFiles, retrievalCount, 
maxResults, maxAttributeChars);
+        return read(docs, authorizer, allProvenanceLogFiles, retrievalCount, 
maxResults, maxAttributeChars);
     }
 
 
@@ -106,7 +106,7 @@ class DocsReader {
         return record;
     }
 
-    public Set<ProvenanceEventRecord> read(final List<Document> docs, final 
AuthorizationCheck authCheck, final Collection<Path> allProvenanceLogFiles,
+    public Set<ProvenanceEventRecord> read(final List<Document> docs, final 
EventAuthorizer authorizer, final Collection<Path> allProvenanceLogFiles,
             final AtomicInteger retrievalCount, final int maxResults, final 
int maxAttributeChars) throws IOException {
 
         if (retrievalCount.get() >= maxResults) {
@@ -114,38 +114,33 @@ class DocsReader {
         }
 
         final long start = System.nanoTime();
-
-        Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
-
-        Map<String, List<Document>> byStorageNameDocGroups = 
LuceneUtil.groupDocsByStorageFileName(docs);
+        final Set<ProvenanceEventRecord> matchingRecords = new 
LinkedHashSet<>();
+        final Map<String, List<Document>> byStorageNameDocGroups = 
LuceneUtil.groupDocsByStorageFileName(docs);
 
         int eventsReadThisFile = 0;
         int logFileCount = 0;
 
         for (String storageFileName : byStorageNameDocGroups.keySet()) {
-            File provenanceEventFile = 
LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles);
-            if (provenanceEventFile != null) {
-                try (RecordReader reader = 
RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
-                        maxAttributeChars)) {
-
-                    Iterator<Document> docIter = 
byStorageNameDocGroups.get(storageFileName).iterator();
-                    while (docIter.hasNext() && 
retrievalCount.getAndIncrement() < maxResults) {
-                        ProvenanceEventRecord event = 
this.getRecord(docIter.next(), reader);
-                        if (event != null && authCheck.isAuthorized(event)) {
-                            matchingRecords.add(event);
-                            eventsReadThisFile++;
-                        }
-                    }
+            final File provenanceEventFile = 
LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles);
+            if (provenanceEventFile == null) {
+                logger.warn("Could not find Provenance Log File with "
+                    + "basename {} in the Provenance Repository; assuming "
+                    + "file has expired and continuing without it", 
storageFileName);
+                continue;
+            }
 
-                } catch (Exception e) {
-                    logger.warn("Failed while trying to read Provenance 
Events. The event file '"
-                            + provenanceEventFile.getAbsolutePath() +
-                            "' may be missing or corrupted.", e);
+            try (final RecordReader reader = 
RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles, 
maxAttributeChars)) {
+                final Iterator<Document> docIter = 
byStorageNameDocGroups.get(storageFileName).iterator();
+                while (docIter.hasNext() && retrievalCount.getAndIncrement() < 
maxResults) {
+                    final ProvenanceEventRecord event = 
getRecord(docIter.next(), reader);
+                    if (event != null && authorizer.isAuthorized(event)) {
+                        matchingRecords.add(event);
+                        eventsReadThisFile++;
+                    }
                 }
-            } else {
-                logger.warn("Could not find Provenance Log File with "
-                        + "basename {} in the Provenance Repository; assuming "
-                        + "file has expired and continuing without it", 
storageFileName);
+            } catch (final Exception e) {
+                logger.warn("Failed to read Provenance Events. The event file 
'"
+                    + provenanceEventFile.getAbsolutePath() + "' may be 
missing or corrupt.", e);
             }
         }
 

Reply via email to