http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java
new file mode 100644
index 0000000..051cd1f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.authorization.EventAuthorizer;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.provenance.store.EventStore;
+
+/**
+ * An Event Index is responsible for indexing Provenance Events in such a way 
that the index can be quickly
+ * searched to in order to retrieve events of interest.
+ */
+public interface EventIndex extends Closeable {
+
+    /**
+     * Initializes the Event Index, providing it access to the Event Store, in 
case it is necessary for performing
+     * initialization tasks
+     *
+     * @param eventStore the EventStore that holds the events that have been 
given to the repository.
+     */
+    void initialize(EventStore eventStore);
+
+    /**
+     * Adds the given events to the index so that they can be queried later.
+     *
+     * @param events the events to index along with their associated Storage 
Summaries
+     */
+    void addEvents(Map<ProvenanceEventRecord, StorageSummary> events);
+
+    /**
+     * Replaces the entries in the appropriate index with the given events
+     *
+     * @param events the events to add or replace along with their associated 
Storage Summaries
+     */
+    void reindexEvents(Map<ProvenanceEventRecord, StorageSummary> events);
+
+    /**
+     * @return the number of bytes that are utilized by the Event Index
+     */
+    long getSize();
+
+    /**
+     * Submits a Query asynchronously and returns a QuerySubmission that can 
be used to obtain the results
+     *
+     * @param query the query to perform
+     * @param authorizer the authorizer to use in order to determine whether 
or not a particular event should be included in the result
+     * @param userId the ID of the user on whose behalf the query is being 
submitted
+     *
+     * @return a QuerySubmission that can be used to retrieve the results later
+     */
+    QuerySubmission submitQuery(Query query, EventAuthorizer authorizer, 
String userId);
+
+    /**
+     * Asynchronously computes the lineage for the FlowFile that is identified 
by the Provenance Event with the given ID.
+     *
+     * @param eventId the ID of the Provenance Event for which the lineage 
should be calculated
+     * @param user the NiFi user on whose behalf the computing is being 
performed
+     * @param authorizer the authorizer to use in order to determine whether 
or not a particular event should be included in the result
+     *
+     * @return a ComputeLineageSubmission that can be used to retrieve the 
results later
+     */
+    ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser 
user, EventAuthorizer authorizer);
+
+    /**
+     * Asynchronously computes the lineage for the FlowFile that has the given 
FlowFile UUID.
+     *
+     * @param flowFileUuid the UUID of the FlowFile for which the lineage 
should be computed
+     * @param user the NiFi user on whose behalf the computing is being 
performed
+     * @param authorizer the authorizer to use in order to determine whether 
or not a particular event should be included in the result
+     *
+     * @return a ComputeLineageSubmission that can be used to retrieve the 
results later
+     */
+    ComputeLineageSubmission submitLineageComputation(String flowFileUuid, 
NiFiUser user, EventAuthorizer authorizer);
+
+    /**
+     * Asynchronously computes the lineage that makes up the 'child flowfiles' 
generated by the event with the given ID. This method is
+     * valid only for Events that produce 'child flowfiles' such as FORK, 
CLONE, REPLAY, etc.
+     *
+     * @param eventId the ID of the Provenance Event for which the lineage 
should be calculated
+     * @param user the NiFi user on whose behalf the computing is being 
performed
+     * @param authorizer the authorizer to use in order to determine whether 
or not a particular event should be included in the result
+     *
+     * @return a ComputeLineageSubmission that can be used to retrieve the 
results later
+     */
+    ComputeLineageSubmission submitExpandChildren(long eventId, NiFiUser user, 
EventAuthorizer authorizer);
+
+    /**
+     * Asynchronously computes the lineage that makes up the 'parent 
flowfiles' that were involved in the event with the given ID. This method
+     * is valid only for Events that have 'parent flowfiles' such as FORK, 
JOIN, etc.
+     *
+     * @param eventId the ID of the Provenance Event for which the lineage 
should be calculated
+     * @param user the NiFi user on whose behalf the computing is being 
performed
+     * @param authorizer the authorizer to use in order to determine whether 
or not a particular event should be included in the result
+     *
+     * @return a ComputeLineageSubmission that can be used to retrieve the 
results later
+     */
+    ComputeLineageSubmission submitExpandParents(long eventId, NiFiUser user, 
EventAuthorizer authorizer);
+
+    /**
+     * Retrieves the ComputeLineageSubmission that was returned by the 
'submitLineageComputation' methods
+     *
+     * @param lineageIdentifier the identifier of the linage
+     * @param user the NiFi user on whose behalf the retrieval is being 
performed
+     * @return the ComputeLineageSubmission that represents the asynchronous 
lineage computation that is being performed under the given
+     *         identifier, or <code>null</code> if the identifier cannot be 
found.
+     */
+    ComputeLineageSubmission retrieveLineageSubmission(String 
lineageIdentifier, NiFiUser user);
+
+    /**
+     * Retrieves the QuerySubmission that was returned by the 'submitQuery' 
method
+     *
+     * @param queryIdentifier the identifier of the query
+     * @param user the NiFi user on whose behalf the retrieval is being 
performed
+     * @return the QuerySubmission that represents the asynchronous query that 
is being performed under the given
+     *         identifier, or <code>null</code> if the identifier cannot be 
found.
+     */
+    QuerySubmission retrieveQuerySubmission(String queryIdentifier, NiFiUser 
user);
+
+    /**
+     * Upon restart of NiFi, it is possible that the Event Index will have 
lost some events due to frequency of committing the index.
+     * In such as case, this method is responsible for returning the minimum 
Provenance Event ID that it knows is safely indexed. If
+     * any Provenance Event exists in the Event Store with an ID greater than 
the value returned, that Event should be re-indexed.
+     *
+     * @param partitionName the name of the Partition for which the minimum 
Event ID is desired
+     * @return the minimum Provenance Event ID that the Index knows is safely 
indexed for the given partition
+     */
+    long getMinimumEventIdToReindex(String partitionName);
+
+    /**
+     * Instructs the Event Index to commit any changes that have been made to 
the partition with the given name
+     *
+     * @param partitionName the name of the partition to commit changes
+     * @throws IOException if unable to commit the changes
+     */
+    void commitChanges(String partitionName) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexSearcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexSearcher.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexSearcher.java
new file mode 100644
index 0000000..8389408
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexSearcher.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index;
+
+import java.io.Closeable;
+import java.io.File;
+
+import org.apache.lucene.search.IndexSearcher;
+
+public interface EventIndexSearcher extends Closeable {
+    IndexSearcher getIndexSearcher();
+
+    File getIndexDirectory();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexWriter.java
new file mode 100644
index 0000000..f0af7dc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+
+public interface EventIndexWriter extends Closeable {
+
+    boolean index(Document document, int commitThreshold) throws IOException;
+
+    boolean index(List<Document> documents, int commitThreshold) throws 
IOException;
+
+    File getDirectory();
+
+    long commit() throws IOException;
+
+    int getEventsIndexedSinceCommit();
+
+    long getEventsIndexed();
+
+    IndexWriter getIndexWriter();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/SearchFailedException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/SearchFailedException.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/SearchFailedException.java
new file mode 100644
index 0000000..ce1bedb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/SearchFailedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index;
+
+public class SearchFailedException extends RuntimeException {
+    public SearchFailedException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    public SearchFailedException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java
new file mode 100644
index 0000000..770c455
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+
+public interface CachedQuery {
+
+    void update(ProvenanceEventRecord event, StorageSummary storageSummary);
+
+    Optional<List<Long>> evaluate(Query query);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CommitPreference.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CommitPreference.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CommitPreference.java
new file mode 100644
index 0000000..2208917
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CommitPreference.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+public enum CommitPreference {
+    FORCE_COMMIT,
+    PREVENT_COMMIT,
+    NO_PREFERENCE;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java
new file mode 100644
index 0000000..765b81f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.index.lucene;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.LongField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.FieldInfo.IndexOptions;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+
+public class ConvertEventToLuceneDocument {
+    private final Set<SearchableField> searchableEventFields;
+    private final Set<SearchableField> searchableAttributeFields;
+
+    public ConvertEventToLuceneDocument(final List<SearchableField> 
searchableEventFields, final List<SearchableField> searchableAttributes) {
+        this.searchableEventFields = Collections.unmodifiableSet(new 
HashSet<>(searchableEventFields));
+        this.searchableAttributeFields = Collections.unmodifiableSet(new 
HashSet<>(searchableAttributes));
+    }
+
+    private void addField(final Document doc, final SearchableField field, 
final String value) {
+        if (value == null || (!field.isAttribute() && 
!searchableEventFields.contains(field))) {
+            return;
+        }
+
+        doc.add(new StringField(field.getSearchableFieldName(), 
value.toLowerCase(), Store.NO));
+    }
+
+
+    public Document convert(final ProvenanceEventRecord record, final 
StorageSummary persistedEvent) {
+        final Document doc = new Document();
+        addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid());
+        addField(doc, SearchableFields.Filename, 
record.getAttribute(CoreAttributes.FILENAME.key()));
+        addField(doc, SearchableFields.ComponentID, record.getComponentId());
+        addField(doc, SearchableFields.AlternateIdentifierURI, 
record.getAlternateIdentifierUri());
+        addField(doc, SearchableFields.EventType, 
record.getEventType().name());
+        addField(doc, SearchableFields.Relationship, record.getRelationship());
+        addField(doc, SearchableFields.Details, record.getDetails());
+        addField(doc, SearchableFields.ContentClaimSection, 
record.getContentClaimSection());
+        addField(doc, SearchableFields.ContentClaimContainer, 
record.getContentClaimContainer());
+        addField(doc, SearchableFields.ContentClaimIdentifier, 
record.getContentClaimIdentifier());
+        addField(doc, SearchableFields.SourceQueueIdentifier, 
record.getSourceQueueIdentifier());
+        addField(doc, SearchableFields.TransitURI, record.getTransitUri());
+
+        for (final SearchableField searchableField : 
searchableAttributeFields) {
+            addField(doc, searchableField, 
LuceneUtil.truncateIndexField(record.getAttribute(searchableField.getSearchableFieldName())));
+        }
+
+        // Index the fields that we always index (unless there's nothing else 
to index at all)
+        if (!doc.getFields().isEmpty()) {
+            // Always include Lineage Start Date because it allows us to make 
our Lineage queries more efficient.
+            doc.add(new 
LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), 
record.getLineageStartDate(), Store.NO));
+            // Always include Event Time because most queries are bound by a 
start and end time.
+            doc.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
record.getEventTime(), Store.NO));
+            // We always include File Size because the UI wants to always 
render the controls for specifying this. This idea could be revisited.
+            doc.add(new 
LongField(SearchableFields.FileSize.getSearchableFieldName(), 
record.getFileSize(), Store.NO));
+            // We always store the event Event ID in the Document but do not 
index it. It doesn't make sense to query based on Event ID because
+            // if we want a particular Event ID, we can just obtain it 
directly from the EventStore. But when we obtain a Document, this info must
+            // be stored so that we know how to lookup the event in the store.
+            doc.add(new 
UnIndexedLongField(SearchableFields.Identifier.getSearchableFieldName(), 
persistedEvent.getEventId()));
+
+            // If it's event is a FORK, or JOIN, add the FlowFileUUID for all 
child/parent UUIDs.
+            final ProvenanceEventType eventType = record.getEventType();
+            if (eventType == ProvenanceEventType.FORK || eventType == 
ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) {
+                for (final String uuid : record.getChildUuids()) {
+                    if (!uuid.equals(record.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid);
+                    }
+                }
+            } else if (eventType == ProvenanceEventType.JOIN) {
+                for (final String uuid : record.getParentUuids()) {
+                    if (!uuid.equals(record.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid);
+                    }
+                }
+            } else if (eventType == ProvenanceEventType.RECEIVE && 
record.getSourceSystemFlowFileIdentifier() != null) {
+                // If we get a receive with a Source System FlowFile 
Identifier, we add another Document that shows the UUID
+                // that the Source System uses to refer to the data.
+                final String sourceIdentifier = 
record.getSourceSystemFlowFileIdentifier();
+                final String sourceFlowFileUUID;
+                final int lastColon = sourceIdentifier.lastIndexOf(":");
+                if (lastColon > -1 && lastColon < sourceIdentifier.length() - 
2) {
+                    sourceFlowFileUUID = sourceIdentifier.substring(lastColon 
+ 1);
+                } else {
+                    sourceFlowFileUUID = null;
+                }
+
+                if (sourceFlowFileUUID != null) {
+                    addField(doc, SearchableFields.FlowFileUUID, 
sourceFlowFileUUID);
+                }
+            }
+
+            return doc;
+        }
+
+        return null;
+    }
+
+    private static class UnIndexedLongField extends Field {
+        static final FieldType TYPE = new FieldType();
+        static {
+            TYPE.setIndexed(false);
+            TYPE.setTokenized(true);
+            TYPE.setOmitNorms(true);
+            TYPE.setIndexOptions(IndexOptions.DOCS_ONLY);
+            TYPE.setNumericType(FieldType.NumericType.LONG);
+            TYPE.setStored(true);
+            TYPE.freeze();
+        }
+
+        public UnIndexedLongField(String name, long value) {
+            super(name, TYPE);
+            fieldsData = Long.valueOf(value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/EventIndexTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/EventIndexTask.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/EventIndexTask.java
new file mode 100644
index 0000000..f8bbd3b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/EventIndexTask.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.index.EventIndexWriter;
+import org.apache.nifi.provenance.lucene.IndexManager;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventIndexTask implements Runnable {
+    private static final Logger logger = 
LoggerFactory.getLogger(EventIndexTask.class);
+    private static final String EVENT_CATEGORY = "Provenance Repository";
+    public static final int MAX_DOCUMENTS_PER_THREAD = 100;
+    public static final int DEFAULT_MAX_EVENTS_PER_COMMIT = 1_000_000;
+
+    private final BlockingQueue<StoredDocument> documentQueue;
+    private final IndexManager indexManager;
+    private volatile boolean shutdown = false;
+
+    private final IndexDirectoryManager directoryManager;
+    private final EventReporter eventReporter;
+    private final int commitThreshold;
+
+    public EventIndexTask(final BlockingQueue<StoredDocument> documentQueue, 
final RepositoryConfiguration repoConfig, final IndexManager indexManager,
+        final IndexDirectoryManager directoryManager, final int 
maxEventsPerCommit, final EventReporter eventReporter) {
+        this.documentQueue = documentQueue;
+        this.indexManager = indexManager;
+        this.directoryManager = directoryManager;
+        this.commitThreshold = maxEventsPerCommit;
+        this.eventReporter = eventReporter;
+    }
+
+    public void shutdown() {
+        this.shutdown = true;
+    }
+
+    private void fetchDocuments(final List<StoredDocument> destination) throws 
InterruptedException {
+        // We want to fetch up to INDEX_BUFFER_SIZE documents at a time. 
However, we don't want to continually
+        // call #drainTo on the queue. So we call poll, blocking for up to 1 
second. If we get any event, then
+        // we will call drainTo to gather the rest. If we get no events, then 
we just return, having gathered
+        // no events.
+        StoredDocument firstDoc = documentQueue.poll(1, TimeUnit.SECONDS);
+        if (firstDoc == null) {
+            return;
+        }
+
+        destination.add(firstDoc);
+        documentQueue.drainTo(destination, MAX_DOCUMENTS_PER_THREAD - 1);
+    }
+
+    @Override
+    public void run() {
+        final List<StoredDocument> toIndex = new 
ArrayList<>(MAX_DOCUMENTS_PER_THREAD);
+
+        while (!shutdown) {
+            try {
+                // Get the Documents that we want to index.
+                toIndex.clear();
+                fetchDocuments(toIndex);
+
+                if (toIndex.isEmpty()) {
+                    continue;
+                }
+
+                // Write documents to the currently active index.
+                final Map<String, List<StoredDocument>> docsByPartition = 
toIndex.stream()
+                    .collect(Collectors.groupingBy(doc -> 
doc.getStorageSummary().getPartitionName().get()));
+
+                for (final Map.Entry<String, List<StoredDocument>> entry : 
docsByPartition.entrySet()) {
+                    final String partitionName = entry.getKey();
+                    final List<StoredDocument> docs = entry.getValue();
+
+                    index(docs, partitionName);
+                }
+            } catch (final Exception e) {
+                logger.error("Failed to index Provenance Events", e);
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, 
"Failed to index Provenance Events. See logs for more information.");
+            }
+        }
+    }
+
+
+    /**
+     * Re-indexes the documents given. The IndexableDocument's provided are 
required to have the IndexDirectory provided.
+     */
+    void reIndex(final List<IndexableDocument> toIndex, final CommitPreference 
commitPreference) throws IOException {
+        if (toIndex.isEmpty()) {
+            return;
+        }
+
+        final Map<File, List<IndexableDocument>> docsByIndexDir = 
toIndex.stream().collect(Collectors.groupingBy(doc -> doc.getIndexDirectory()));
+        for (final Map.Entry<File, List<IndexableDocument>> entry : 
docsByIndexDir.entrySet()) {
+            final File indexDirectory = entry.getKey();
+            final List<IndexableDocument> documentsForIndex = entry.getValue();
+
+            final EventIndexWriter indexWriter = 
indexManager.borrowIndexWriter(indexDirectory);
+            try {
+                // Remove any documents that already exist in this index that 
are overlapping.
+                long minId = Long.MAX_VALUE;
+                long maxId = Long.MIN_VALUE;
+
+                for (final IndexableDocument doc : toIndex) {
+                    final long eventId = 
doc.getDocument().getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                    if (eventId < minId) {
+                        minId = eventId;
+                    }
+                    if (eventId > maxId) {
+                        maxId = eventId;
+                    }
+                }
+
+                final NumericRangeQuery<Long> query = 
NumericRangeQuery.newLongRange(
+                    SearchableFields.Identifier.getSearchableFieldName(), 
minId, maxId, true, true);
+                indexWriter.getIndexWriter().deleteDocuments(query);
+
+                final List<Document> documents = documentsForIndex.stream()
+                    .map(doc -> doc.getDocument())
+                    .collect(Collectors.toList());
+
+                indexWriter.index(documents, commitThreshold);
+            } finally {
+                indexManager.returnIndexWriter(indexWriter, 
CommitPreference.FORCE_COMMIT.equals(commitPreference), false);
+            }
+        }
+    }
+
+
+    private void index(final List<StoredDocument> toIndex, final String 
partitionName) throws IOException {
+        if (toIndex.isEmpty()) {
+            return;
+        }
+
+        // Convert the IndexableDocument list into a List of Documents so that 
we can pass them to the Index Writer.
+        final List<Document> documents = toIndex.stream()
+            .map(doc -> doc.getDocument())
+            .collect(Collectors.toList());
+
+        boolean requestClose = false;
+        boolean requestCommit = false;
+
+        final long minEventTime = toIndex.stream()
+            .mapToLong(doc -> 
doc.getDocument().getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue())
+            .min()
+            .getAsLong();
+
+        // Synchronize on the directory manager because we don't want the 
active directory to change
+        // while we are obtaining an index writer for it. I.e., determining 
the active directory
+        // and obtaining an Index Writer for it need to be done atomically.
+        final EventIndexWriter indexWriter;
+        final File indexDirectory;
+        synchronized (directoryManager) {
+            indexDirectory = 
directoryManager.getWritableIndexingDirectory(minEventTime, partitionName);
+            indexWriter = indexManager.borrowIndexWriter(indexDirectory);
+        }
+
+        try {
+            // Perform the actual indexing.
+            boolean writerIndicatesCommit = indexWriter.index(documents, 
commitThreshold);
+
+            // If we don't need to commit index based on what index writer 
tells us, we will still want
+            // to commit the index if it's assigned to a partition and this is 
no longer the active index
+            // for that partition. This prevents the following case:
+            //
+            // Thread T1: pulls events from queue
+            //            Maps events to Index Directory D1
+            // Thread T2: pulls events from queue
+            //            Maps events to Index Directory D1, the active index 
for Partition P1.
+            //            Writes events to D1.
+            //            Commits Index Writer for D1.
+            //            Closes Index Writer for D1.
+            // Thread T1: Writes events to D1.
+            //            Determines that Index Writer for D1 does not need to 
be committed or closed.
+            //
+            // In the case outlined above, we would potentially lose those 
events from the index! To avoid this,
+            // we simply decide to commit the index if this writer is no 
longer the active writer for the index.
+            // However, if we have 10 threads, we don't want all 10 threads 
trying to commit the index after each
+            // update. We want to commit when they've all finished. This is 
what the IndexManager will do if we request
+            // that it commit the index. It will also close the index if 
requested, once all writers have finished.
+            // So when this is the case, we will request that the Index 
Manager both commit and close the writer.
+
+            final Optional<File> activeIndexDirOption = 
directoryManager.getActiveIndexDirectory(partitionName);
+            if (!activeIndexDirOption.isPresent() || 
!activeIndexDirOption.get().equals(indexDirectory)) {
+                requestCommit = true;
+                requestClose = true;
+            }
+
+            if (writerIndicatesCommit) {
+                commit(indexWriter);
+                requestCommit = false; // we've already committed the index 
writer so no need to request that the index manager do so also.
+                final boolean directoryManagerIndicatesClose = 
directoryManager.onIndexCommitted(indexDirectory);
+                requestClose = requestClose || directoryManagerIndicatesClose;
+
+                if (logger.isDebugEnabled()) {
+                    final long maxId = documents.stream()
+                        .mapToLong(doc -> 
doc.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue())
+                        .max()
+                        .orElse(-1L);
+                    logger.debug("Committed index {} after writing a max Event 
ID of {}", indexDirectory, maxId);
+                }
+            }
+        } finally {
+            indexManager.returnIndexWriter(indexWriter, requestCommit, 
requestClose);
+        }
+    }
+
+
+    protected void commit(final EventIndexWriter indexWriter) throws 
IOException {
+        final long start = System.nanoTime();
+        final long approximateCommitCount = indexWriter.commit();
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+        logger.debug("Successfully committed approximately {} Events to {} in 
{} millis", approximateCommitCount, indexWriter, millis);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
new file mode 100644
index 0000000..09878ff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.provenance.RepositoryConfiguration;
+import org.apache.nifi.provenance.util.DirectoryUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexDirectoryManager {
+    private static final Logger logger = 
LoggerFactory.getLogger(IndexDirectoryManager.class);
+    private static final FileFilter INDEX_DIRECTORY_FILTER = f -> 
f.getName().startsWith("index-");
+    private static final Pattern INDEX_FILENAME_PATTERN = 
Pattern.compile("index-(\\d+)");
+
+    private final RepositoryConfiguration repoConfig;
+
+    // guarded by synchronizing on 'this'
+    private final SortedMap<Long, List<IndexLocation>> 
indexLocationByTimestamp = new TreeMap<>();
+    private final Map<String, IndexLocation> activeIndices = new HashMap<>();
+
+    public IndexDirectoryManager(final RepositoryConfiguration repoConfig) {
+        this.repoConfig = repoConfig;
+    }
+
+    public synchronized void initialize() {
+        final Map<File, Tuple<Long, IndexLocation>> latestIndexByStorageDir = 
new HashMap<>();
+
+        for (final Map.Entry<String, File> entry : 
repoConfig.getStorageDirectories().entrySet()) {
+            final String partitionName = entry.getKey();
+            final File storageDir = entry.getValue();
+
+            final File[] indexDirs = 
storageDir.listFiles(INDEX_DIRECTORY_FILTER);
+            if (indexDirs == null) {
+                logger.warn("Unable to access Provenance Repository storage 
directory {}", storageDir);
+                continue;
+            }
+
+            for (final File indexDir : indexDirs) {
+                final Matcher matcher = 
INDEX_FILENAME_PATTERN.matcher(indexDir.getName());
+                if (!matcher.matches()) {
+                    continue;
+                }
+
+                final long startTime = 
DirectoryUtils.getIndexTimestamp(indexDir);
+                final List<IndexLocation> dirsForTimestamp = 
indexLocationByTimestamp.computeIfAbsent(startTime, t -> new ArrayList<>());
+                final IndexLocation indexLoc = new IndexLocation(indexDir, 
startTime, partitionName, repoConfig.getDesiredIndexSize());
+                dirsForTimestamp.add(indexLoc);
+
+                final Tuple<Long, IndexLocation> tuple = 
latestIndexByStorageDir.get(storageDir);
+                if (tuple == null || startTime > tuple.getKey()) {
+                    latestIndexByStorageDir.put(storageDir, new 
Tuple<>(startTime, indexLoc));
+                }
+            }
+        }
+
+        // Restore the activeIndices to point at the newest index in each 
storage location.
+        for (final Tuple<Long, IndexLocation> tuple : 
latestIndexByStorageDir.values()) {
+            final IndexLocation indexLoc = tuple.getValue();
+            activeIndices.put(indexLoc.getPartitionName(), indexLoc);
+        }
+    }
+
+
+    public synchronized void deleteDirectory(final File directory) {
+        final Iterator<Map.Entry<Long, List<IndexLocation>>> itr = 
indexLocationByTimestamp.entrySet().iterator();
+        while (itr.hasNext()) {
+            final Map.Entry<Long, List<IndexLocation>> entry = itr.next();
+            final List<IndexLocation> locations = entry.getValue();
+
+            final IndexLocation locToRemove = new IndexLocation(directory, 
DirectoryUtils.getIndexTimestamp(directory),
+                directory.getName(), repoConfig.getDesiredIndexSize());
+            locations.remove(locToRemove);
+            if (locations.isEmpty()) {
+                itr.remove();
+            }
+        }
+    }
+
+    /**
+     * Returns a List of all indexes where the latest event in the index has 
an event time before the given timestamp
+     *
+     * @param timestamp the cutoff
+     * @return all Files that belong to an index, where the index has no 
events later than the given time
+     */
+    public synchronized List<File> getDirectoriesBefore(final long timestamp) {
+        final List<File> selected = new ArrayList<>();
+
+        // An index cannot be expired if it is the latest index in the storage 
directory. As a result, we need to
+        // separate the indexes by Storage Directory so that we can easily 
determine if this is the case.
+        final Map<String, List<IndexLocation>> 
startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream()
+            .collect(Collectors.groupingBy(indexLoc -> 
indexLoc.getPartitionName()));
+
+        // Scan through the index directories and the associated index event 
start time.
+        // If looking at index N, we can determine the index end time by 
assuming that it is the same as the
+        // start time of index N+1. So we determine the time range of each 
index and select an index only if
+        // its start time is before the given timestamp and its end time is <= 
the given timestamp.
+        for (final List<IndexLocation> startTimeWithFile : 
startTimeWithFileByStorageDirectory.values()) {
+            for (int i = 0; i < startTimeWithFile.size(); i++) {
+                final IndexLocation indexLoc = startTimeWithFile.get(i);
+
+                final String partition = indexLoc.getPartitionName();
+                final IndexLocation activeLocation = 
activeIndices.get(partition);
+                if (indexLoc.equals(activeLocation)) {
+                    continue;
+                }
+
+                final Long indexStartTime = indexLoc.getIndexStartTimestamp();
+                if (indexStartTime > timestamp) {
+                    // If the first timestamp in the index is later than the 
desired timestamp,
+                    // then we are done. We can do this because the list is 
ordered by monotonically
+                    // increasing timestamp as the Tuple key.
+                    break;
+                }
+
+                if (i < startTimeWithFile.size() - 1) {
+                    final IndexLocation nextLocation = startTimeWithFile.get(i 
+ 1);
+                    final Long indexEndTime = 
nextLocation.getIndexStartTimestamp();
+                    if (indexEndTime <= timestamp) {
+                        logger.debug("Considering Index Location {} older than 
{} ({}) because its events have an EventTime "
+                            + "ranging from {} ({}) to {} ({}) based on the 
following IndexLocations: {}", nextLocation, timestamp, new Date(timestamp),
+                            indexStartTime, new Date(indexStartTime), 
indexEndTime, new Date(indexEndTime), startTimeWithFile);
+
+                        selected.add(nextLocation.getIndexDirectory());
+                    }
+                }
+            }
+        }
+
+        logger.debug("Returning the following list of index locations because 
they were finished being written to before {}: {}", timestamp, selected);
+        return selected;
+    }
+
+    /**
+     * Convert directoriesByTimestamp to a List of IndexLocations.
+     * This allows us to easily get the 'next' value when iterating over the 
elements.
+     * This is useful because we know that the 'next' value will have a 
timestamp that is when that
+     * file started being written to - which is the same as when this index 
stopped being written to.
+     *
+     * @return a List of all IndexLocations known
+     */
+    private List<IndexLocation> flattenDirectoriesByTimestamp() {
+        final List<IndexLocation> startTimeWithFile = new ArrayList<>();
+        for (final Map.Entry<Long, List<IndexLocation>> entry : 
indexLocationByTimestamp.entrySet()) {
+            for (final IndexLocation indexLoc : entry.getValue()) {
+                startTimeWithFile.add(indexLoc);
+            }
+        }
+
+        return startTimeWithFile;
+    }
+
+    public synchronized List<File> getDirectories(final Long startTime, final 
Long endTime) {
+        final List<File> selected = new ArrayList<>();
+
+        // An index cannot be expired if it is the latest index in the 
partition. As a result, we need to
+        // separate the indexes by partition so that we can easily determine 
if this is the case.
+        final Map<String, List<IndexLocation>> 
startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream()
+            .collect(Collectors.groupingBy(indexLoc -> 
indexLoc.getPartitionName()));
+
+        for (final List<IndexLocation> locationList : 
startTimeWithFileByStorageDirectory.values()) {
+            selected.addAll(getDirectories(startTime, endTime, locationList));
+        }
+
+        return selected;
+    }
+
+    public synchronized List<File> getDirectories(final Long startTime, final 
Long endTime, final String partitionName) {
+        // An index cannot be expired if it is the latest index in the 
partition. As a result, we need to
+        // separate the indexes by partition so that we can easily determine 
if this is the case.
+        final Map<String, List<IndexLocation>> 
startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream()
+            .collect(Collectors.groupingBy(indexLoc -> 
indexLoc.getPartitionName()));
+
+        final List<IndexLocation> indexLocations = 
startTimeWithFileByStorageDirectory.get(partitionName);
+        if (indexLocations == null) {
+            return Collections.emptyList();
+        }
+
+        return getDirectories(startTime, endTime, indexLocations);
+    }
+
+    protected static List<File> getDirectories(final Long startTime, final 
Long endTime, final List<IndexLocation> locations) {
+        final List<File> selected = new ArrayList<>();
+
+        int overlapCount = 0;
+        for (int i = 0; i < locations.size(); i++) {
+            final IndexLocation indexLoc = locations.get(i);
+            final Long indexStartTimestamp = indexLoc.getIndexStartTimestamp();
+            if (endTime != null && indexStartTimestamp > endTime) {
+                if (overlapCount == 0) {
+                    // Because of how we handle index timestamps and the 
multi-threading, it is possible
+                    // the we could have some overlap where Thread T1 gets an 
Event with start time 1,000
+                    // for instance. Then T2 gets and Event with start time 
1,002 and ends up creating a
+                    // new index directory with a start time of 1,002. Then T1 
could end up writing events
+                    // with timestamp 1,000 to an index with a 'start time' of 
1,002. Because of this,
+                    // the index start times are approximate. To address this, 
we include one extra Index
+                    // Directory based on start time, so that if we want index 
directories for Time Range
+                    // 1,000 - 1,001 and have indexes 999 and 1,002 we will 
include the 999 and the 'overlapping'
+                    // directory of 1,002 since it could potentially have an 
event with overlapping timestamp.
+                    overlapCount++;
+                } else {
+                    continue;
+                }
+            }
+
+            if (startTime != null) {
+                final Long indexEndTimestamp;
+                if (i < locations.size() - 1) {
+                    final IndexLocation nextIndexLoc = locations.get(i + 1);
+                    indexEndTimestamp = nextIndexLoc.getIndexStartTimestamp();
+                    if (indexEndTimestamp < startTime) {
+                        continue;
+                    }
+                }
+            }
+
+            selected.add(indexLoc.getIndexDirectory());
+        }
+
+        return selected;
+    }
+
+    /**
+     * Notifies the Index Directory Manager that an Index Writer has been 
committed for the
+     * given index directory. This allows the Directory Manager to know that 
it needs to check
+     * the size of the index directory and not return this directory as a 
writable directory
+     * any more if the size has reached the configured threshold.
+     *
+     * @param indexDir the directory that was written to
+     * @return <code>true</code> if the index directory has reached its max 
threshold and should no
+     *         longer be written to, <code>false</code> if the index directory 
is not full.
+     */
+    public boolean onIndexCommitted(final File indexDir) {
+        final long indexSize = getSize(indexDir);
+        synchronized (this) {
+            String partitionName = null;
+            for (final Map.Entry<String, IndexLocation> entry : 
activeIndices.entrySet()) {
+                if (indexDir.equals(entry.getValue().getIndexDirectory())) {
+                    partitionName = entry.getKey();
+                    break;
+                }
+            }
+
+            // If the index is not the active index directory, it should no 
longer be written to.
+            if (partitionName == null) {
+                logger.debug("Size of Provenance Index at {} is now {}. 
However, was unable to find the appropriate Active Index to roll over.", 
indexDir, indexSize);
+                return true;
+            }
+
+            // If the index size >= desired index size, it should no longer be 
written to.
+            if (indexSize >= repoConfig.getDesiredIndexSize()) {
+                logger.info("Size of Provenance Index at {} is now {}. Will 
close this index and roll over to a new one.", indexDir, indexSize);
+                activeIndices.remove(partitionName);
+
+                return true;
+            }
+
+            // Index directory is the active index directory and has not yet 
exceeded the desired size.
+            return false;
+        }
+    }
+
+    public synchronized Optional<File> getActiveIndexDirectory(final String 
partitionName) {
+        final IndexLocation indexLocation = activeIndices.get(partitionName);
+        if (indexLocation == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(indexLocation.getIndexDirectory());
+    }
+
+    private long getSize(final File indexDir) {
+        if (!indexDir.exists()) {
+            return 0L;
+        }
+        if (!indexDir.isDirectory()) {
+            throw new IllegalArgumentException("Must specify a directory but 
specified " + indexDir);
+        }
+
+        // List all files in the Index Directory.
+        final File[] files = indexDir.listFiles();
+        if (files == null) {
+            return 0L;
+        }
+
+        long sum = 0L;
+        for (final File file : files) {
+            sum += file.length();
+        }
+
+        return sum;
+    }
+
+    /**
+     * Provides the File that is the directory for the index that should be 
written to. If there is no index yet
+     * to be written to, or if the index has reached its max size, a new one 
will be created. The given {@code earliestTimestamp}
+     * should represent the event time of the first event that will go into 
the index. This is used for file naming purposes so
+     * that the appropriate directories can be looked up quickly later.
+     *
+     * @param earliestTimestamp the event time of the first event that will go 
into a new index, if a new index is created by this call.
+     * @param partitionName the name of the partition to write to
+     * @return the directory that should be written to
+     */
+    public synchronized File getWritableIndexingDirectory(final long 
earliestTimestamp, final String partitionName) {
+        IndexLocation indexLoc = activeIndices.get(partitionName);
+        if (indexLoc == null || indexLoc.isIndexFull()) {
+            indexLoc = new IndexLocation(createIndex(earliestTimestamp, 
partitionName), earliestTimestamp, partitionName, 
repoConfig.getDesiredIndexSize());
+            logger.debug("Created new Index Directory {}", indexLoc);
+
+            indexLocationByTimestamp.computeIfAbsent(earliestTimestamp, t -> 
new ArrayList<>()).add(indexLoc);
+            activeIndices.put(partitionName, indexLoc);
+        }
+
+        return indexLoc.getIndexDirectory();
+    }
+
+    private File createIndex(final long earliestTimestamp, final String 
partitionName) {
+        final File storageDir = 
repoConfig.getStorageDirectories().entrySet().stream()
+            .filter(e -> e.getKey().equals(partitionName))
+            .map(Map.Entry::getValue)
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("Invalid 
Partition: " + partitionName));
+        final File indexDir = new File(storageDir, "index-" + 
earliestTimestamp);
+
+        return indexDir;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
new file mode 100644
index 0000000..33867c6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.util.DirectoryUtils;
+
+public class IndexLocation {
+    private static final long SIZE_CHECK_MILLIS = 
TimeUnit.SECONDS.toMillis(30L);
+
+    private final File indexDirectory;
+    private final long indexStartTimestamp;
+    private final String partitionName;
+    private final long desiredIndexSize;
+    private volatile long lastSizeCheckTime = System.currentTimeMillis();
+
+    public IndexLocation(final File indexDirectory, final long 
indexStartTimestamp, final String partitionName, final long desiredIndexSize) {
+        this.indexDirectory = indexDirectory;
+        this.indexStartTimestamp = indexStartTimestamp;
+        this.partitionName = partitionName;
+        this.desiredIndexSize = desiredIndexSize;
+    }
+
+    public File getIndexDirectory() {
+        return indexDirectory;
+    }
+
+    public long getIndexStartTimestamp() {
+        return indexStartTimestamp;
+    }
+
+    public String getPartitionName() {
+        return partitionName;
+    }
+
+    public boolean isIndexFull() {
+        final long now = System.currentTimeMillis();
+        final long millisSinceLastSizeCheck = now - lastSizeCheckTime;
+        if (millisSinceLastSizeCheck < SIZE_CHECK_MILLIS) {
+            return false;
+        }
+
+        lastSizeCheckTime = now;
+        return DirectoryUtils.getSize(indexDirectory) >= desiredIndexSize;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * indexDirectory.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+
+        if (!(obj instanceof IndexLocation)) {
+            return false;
+        }
+
+        final IndexLocation other = (IndexLocation) obj;
+        return indexDirectory.equals(other.getIndexDirectory());
+    }
+
+    @Override
+    public String toString() {
+        return "IndexLocation[directory=" + indexDirectory + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexableDocument.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexableDocument.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexableDocument.java
new file mode 100644
index 0000000..1fc163f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexableDocument.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.io.File;
+
+import org.apache.lucene.document.Document;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+
+public class IndexableDocument {
+    private final Document document;
+    private final StorageSummary persistenceLocation;
+    private final File indexDirectory;
+
+    public IndexableDocument(final Document document, final StorageSummary 
location, final File indexDirectory) {
+        this.document = document;
+        this.persistenceLocation = location;
+        this.indexDirectory = indexDirectory;
+    }
+
+    public Document getDocument() {
+        return document;
+    }
+
+    public StorageSummary getPersistenceLocation() {
+        return persistenceLocation;
+    }
+
+    public File getIndexDirectory() {
+        return indexDirectory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
new file mode 100644
index 0000000..73b0a14
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.SearchTerm;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.util.RingBuffer;
+
+public class LatestEventsPerProcessorQuery implements CachedQuery {
+    private static final String COMPONENT_ID_FIELD_NAME = 
SearchableFields.ComponentID.getSearchableFieldName();
+    private final ConcurrentMap<String, RingBuffer<Long>> latestRecords = new 
ConcurrentHashMap<>();
+
+    @Override
+    public void update(final ProvenanceEventRecord event, final StorageSummary 
storageSummary) {
+        final String componentId = event.getComponentId();
+        final RingBuffer<Long> ringBuffer = 
latestRecords.computeIfAbsent(componentId, id -> new RingBuffer<>(1000));
+        ringBuffer.add(storageSummary.getEventId());
+    }
+
+    @Override
+    public Optional<List<Long>> evaluate(final Query query) {
+        if (query.getMaxResults() > 1000) {
+            // If query max results > 1000 then we know we don't have enough 
results. So just return empty.
+            return Optional.empty();
+        }
+
+        final List<SearchTerm> terms = query.getSearchTerms();
+        if (terms.size() != 1) {
+            return Optional.empty();
+        }
+
+        final SearchTerm term = terms.get(0);
+        if 
(!COMPONENT_ID_FIELD_NAME.equals(term.getSearchableField().getSearchableFieldName()))
 {
+            return Optional.empty();
+        }
+
+        if (query.getEndDate() != null || query.getStartDate() != null) {
+            return Optional.empty();
+        }
+
+        final RingBuffer<Long> ringBuffer = latestRecords.get(term.getValue());
+        if (ringBuffer == null || ringBuffer.getSize() < 
query.getMaxResults()) {
+            return Optional.empty();
+        }
+
+        List<Long> eventIds = ringBuffer.asList();
+        if (eventIds.size() > query.getMaxResults()) {
+            eventIds = eventIds.subList(0, query.getMaxResults());
+        }
+
+        return Optional.of(eventIds);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java
new file mode 100644
index 0000000..94cd013
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.serialization.StorageSummary;
+import org.apache.nifi.util.RingBuffer;
+
+public class LatestEventsQuery implements CachedQuery {
+
+    final RingBuffer<Long> latestRecords = new RingBuffer<>(1000);
+
+    @Override
+    public void update(final ProvenanceEventRecord event, final StorageSummary 
storageSummary) {
+        latestRecords.add(storageSummary.getEventId());
+    }
+
+    @Override
+    public Optional<List<Long>> evaluate(final Query query) {
+        if (latestRecords.getSize() < query.getMaxResults()) {
+            return Optional.empty();
+        }
+
+        if (query.getSearchTerms().isEmpty() && query.getStartDate() == null 
&& query.getEndDate() == null) {
+            final List<Long> eventList = latestRecords.asList();
+            if (eventList.size() > query.getMaxResults()) {
+                return Optional.of(eventList.subList(0, 
query.getMaxResults()));
+            } else {
+                return Optional.of(eventList);
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java
new file mode 100644
index 0000000..15b11b4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.index.lucene;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.index.EventIndexSearcher;
+import org.apache.nifi.provenance.lucene.IndexManager;
+import org.apache.nifi.provenance.util.DirectoryUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LuceneCacheWarmer implements Runnable {
+    private static final Logger logger = 
LoggerFactory.getLogger(LuceneCacheWarmer.class);
+
+    private final File storageDir;
+    private final IndexManager indexManager;
+
+    public LuceneCacheWarmer(final File storageDir, final IndexManager 
indexManager) {
+        this.storageDir = storageDir;
+        this.indexManager = indexManager;
+    }
+
+    @Override
+    public void run() {
+        try {
+            final File[] indexDirs = 
storageDir.listFiles(DirectoryUtils.INDEX_FILE_FILTER);
+            if (indexDirs == null) {
+                logger.info("Cannot warm Lucene Index Cache for " + storageDir 
+ " because the directory could not be read");
+                return;
+            }
+
+            logger.info("Beginning warming of Lucene Index Cache for " + 
storageDir);
+            final long startNanos = System.nanoTime();
+            for (final File indexDir : indexDirs) {
+                final long indexStartNanos = System.nanoTime();
+
+                final EventIndexSearcher eventSearcher = 
indexManager.borrowIndexSearcher(indexDir);
+                indexManager.returnIndexSearcher(eventSearcher);
+
+                final long indexWarmMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStartNanos);
+                logger.debug("Took {} ms to warm Lucene Index {}", 
indexWarmMillis, indexDir);
+            }
+
+            final long warmSecs = 
TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startNanos);
+            logger.info("Finished warming all Lucene Indexes for {} in {} 
seconds", storageDir, warmSecs);
+        } catch (final Exception e) {
+            logger.error("Failed to warm Lucene Index Cache for " + 
storageDir, e);
+        }
+    }
+}

Reply via email to