http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java new file mode 100644 index 0000000..051cd1f --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.provenance.store.EventStore; + +/** + * An Event Index is responsible for indexing Provenance Events in such a way that the index can be quickly + * searched to in order to retrieve events of interest. + */ +public interface EventIndex extends Closeable { + + /** + * Initializes the Event Index, providing it access to the Event Store, in case it is necessary for performing + * initialization tasks + * + * @param eventStore the EventStore that holds the events that have been given to the repository. + */ + void initialize(EventStore eventStore); + + /** + * Adds the given events to the index so that they can be queried later. + * + * @param events the events to index along with their associated Storage Summaries + */ + void addEvents(Map<ProvenanceEventRecord, StorageSummary> events); + + /** + * Replaces the entries in the appropriate index with the given events + * + * @param events the events to add or replace along with their associated Storage Summaries + */ + void reindexEvents(Map<ProvenanceEventRecord, StorageSummary> events); + + /** + * @return the number of bytes that are utilized by the Event Index + */ + long getSize(); + + /** + * Submits a Query asynchronously and returns a QuerySubmission that can be used to obtain the results + * + * @param query the query to perform + * @param authorizer the authorizer to use in order to determine whether or not a particular event should be included in the result + * @param userId the ID of the user on whose behalf the query is being submitted + * + * @return a QuerySubmission that can be used to retrieve the results later + */ + QuerySubmission submitQuery(Query query, EventAuthorizer authorizer, String userId); + + /** + * Asynchronously computes the lineage for the FlowFile that is identified by the Provenance Event with the given ID. + * + * @param eventId the ID of the Provenance Event for which the lineage should be calculated + * @param user the NiFi user on whose behalf the computing is being performed + * @param authorizer the authorizer to use in order to determine whether or not a particular event should be included in the result + * + * @return a ComputeLineageSubmission that can be used to retrieve the results later + */ + ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user, EventAuthorizer authorizer); + + /** + * Asynchronously computes the lineage for the FlowFile that has the given FlowFile UUID. + * + * @param flowFileUuid the UUID of the FlowFile for which the lineage should be computed + * @param user the NiFi user on whose behalf the computing is being performed + * @param authorizer the authorizer to use in order to determine whether or not a particular event should be included in the result + * + * @return a ComputeLineageSubmission that can be used to retrieve the results later + */ + ComputeLineageSubmission submitLineageComputation(String flowFileUuid, NiFiUser user, EventAuthorizer authorizer); + + /** + * Asynchronously computes the lineage that makes up the 'child flowfiles' generated by the event with the given ID. This method is + * valid only for Events that produce 'child flowfiles' such as FORK, CLONE, REPLAY, etc. + * + * @param eventId the ID of the Provenance Event for which the lineage should be calculated + * @param user the NiFi user on whose behalf the computing is being performed + * @param authorizer the authorizer to use in order to determine whether or not a particular event should be included in the result + * + * @return a ComputeLineageSubmission that can be used to retrieve the results later + */ + ComputeLineageSubmission submitExpandChildren(long eventId, NiFiUser user, EventAuthorizer authorizer); + + /** + * Asynchronously computes the lineage that makes up the 'parent flowfiles' that were involved in the event with the given ID. This method + * is valid only for Events that have 'parent flowfiles' such as FORK, JOIN, etc. + * + * @param eventId the ID of the Provenance Event for which the lineage should be calculated + * @param user the NiFi user on whose behalf the computing is being performed + * @param authorizer the authorizer to use in order to determine whether or not a particular event should be included in the result + * + * @return a ComputeLineageSubmission that can be used to retrieve the results later + */ + ComputeLineageSubmission submitExpandParents(long eventId, NiFiUser user, EventAuthorizer authorizer); + + /** + * Retrieves the ComputeLineageSubmission that was returned by the 'submitLineageComputation' methods + * + * @param lineageIdentifier the identifier of the linage + * @param user the NiFi user on whose behalf the retrieval is being performed + * @return the ComputeLineageSubmission that represents the asynchronous lineage computation that is being performed under the given + * identifier, or <code>null</code> if the identifier cannot be found. + */ + ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, NiFiUser user); + + /** + * Retrieves the QuerySubmission that was returned by the 'submitQuery' method + * + * @param queryIdentifier the identifier of the query + * @param user the NiFi user on whose behalf the retrieval is being performed + * @return the QuerySubmission that represents the asynchronous query that is being performed under the given + * identifier, or <code>null</code> if the identifier cannot be found. + */ + QuerySubmission retrieveQuerySubmission(String queryIdentifier, NiFiUser user); + + /** + * Upon restart of NiFi, it is possible that the Event Index will have lost some events due to frequency of committing the index. + * In such as case, this method is responsible for returning the minimum Provenance Event ID that it knows is safely indexed. If + * any Provenance Event exists in the Event Store with an ID greater than the value returned, that Event should be re-indexed. + * + * @param partitionName the name of the Partition for which the minimum Event ID is desired + * @return the minimum Provenance Event ID that the Index knows is safely indexed for the given partition + */ + long getMinimumEventIdToReindex(String partitionName); + + /** + * Instructs the Event Index to commit any changes that have been made to the partition with the given name + * + * @param partitionName the name of the partition to commit changes + * @throws IOException if unable to commit the changes + */ + void commitChanges(String partitionName) throws IOException; +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexSearcher.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexSearcher.java new file mode 100644 index 0000000..8389408 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexSearcher.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index; + +import java.io.Closeable; +import java.io.File; + +import org.apache.lucene.search.IndexSearcher; + +public interface EventIndexSearcher extends Closeable { + IndexSearcher getIndexSearcher(); + + File getIndexDirectory(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexWriter.java new file mode 100644 index 0000000..f0af7dc --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndexWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; + +public interface EventIndexWriter extends Closeable { + + boolean index(Document document, int commitThreshold) throws IOException; + + boolean index(List<Document> documents, int commitThreshold) throws IOException; + + File getDirectory(); + + long commit() throws IOException; + + int getEventsIndexedSinceCommit(); + + long getEventsIndexed(); + + IndexWriter getIndexWriter(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/SearchFailedException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/SearchFailedException.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/SearchFailedException.java new file mode 100644 index 0000000..ce1bedb --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/SearchFailedException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index; + +public class SearchFailedException extends RuntimeException { + public SearchFailedException(final String message, final Throwable cause) { + super(message, cause); + } + + public SearchFailedException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java new file mode 100644 index 0000000..770c455 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.util.List; +import java.util.Optional; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.serialization.StorageSummary; + +public interface CachedQuery { + + void update(ProvenanceEventRecord event, StorageSummary storageSummary); + + Optional<List<Long>> evaluate(Query query); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CommitPreference.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CommitPreference.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CommitPreference.java new file mode 100644 index 0000000..2208917 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CommitPreference.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +public enum CommitPreference { + FORCE_COMMIT, + PREVENT_COMMIT, + NO_PREFERENCE; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java new file mode 100644 index 0000000..765b81f --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.index.lucene; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.FieldType; +import org.apache.lucene.document.LongField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.FieldInfo.IndexOptions; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.lucene.LuceneUtil; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.provenance.serialization.StorageSummary; + +public class ConvertEventToLuceneDocument { + private final Set<SearchableField> searchableEventFields; + private final Set<SearchableField> searchableAttributeFields; + + public ConvertEventToLuceneDocument(final List<SearchableField> searchableEventFields, final List<SearchableField> searchableAttributes) { + this.searchableEventFields = Collections.unmodifiableSet(new HashSet<>(searchableEventFields)); + this.searchableAttributeFields = Collections.unmodifiableSet(new HashSet<>(searchableAttributes)); + } + + private void addField(final Document doc, final SearchableField field, final String value) { + if (value == null || (!field.isAttribute() && !searchableEventFields.contains(field))) { + return; + } + + doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), Store.NO)); + } + + + public Document convert(final ProvenanceEventRecord record, final StorageSummary persistedEvent) { + final Document doc = new Document(); + addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid()); + addField(doc, SearchableFields.Filename, record.getAttribute(CoreAttributes.FILENAME.key())); + addField(doc, SearchableFields.ComponentID, record.getComponentId()); + addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri()); + addField(doc, SearchableFields.EventType, record.getEventType().name()); + addField(doc, SearchableFields.Relationship, record.getRelationship()); + addField(doc, SearchableFields.Details, record.getDetails()); + addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection()); + addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer()); + addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier()); + addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier()); + addField(doc, SearchableFields.TransitURI, record.getTransitUri()); + + for (final SearchableField searchableField : searchableAttributeFields) { + addField(doc, searchableField, LuceneUtil.truncateIndexField(record.getAttribute(searchableField.getSearchableFieldName()))); + } + + // Index the fields that we always index (unless there's nothing else to index at all) + if (!doc.getFields().isEmpty()) { + // Always include Lineage Start Date because it allows us to make our Lineage queries more efficient. + doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO)); + // Always include Event Time because most queries are bound by a start and end time. + doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO)); + // We always include File Size because the UI wants to always render the controls for specifying this. This idea could be revisited. + doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO)); + // We always store the event Event ID in the Document but do not index it. It doesn't make sense to query based on Event ID because + // if we want a particular Event ID, we can just obtain it directly from the EventStore. But when we obtain a Document, this info must + // be stored so that we know how to lookup the event in the store. + doc.add(new UnIndexedLongField(SearchableFields.Identifier.getSearchableFieldName(), persistedEvent.getEventId())); + + // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs. + final ProvenanceEventType eventType = record.getEventType(); + if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) { + for (final String uuid : record.getChildUuids()) { + if (!uuid.equals(record.getFlowFileUuid())) { + addField(doc, SearchableFields.FlowFileUUID, uuid); + } + } + } else if (eventType == ProvenanceEventType.JOIN) { + for (final String uuid : record.getParentUuids()) { + if (!uuid.equals(record.getFlowFileUuid())) { + addField(doc, SearchableFields.FlowFileUUID, uuid); + } + } + } else if (eventType == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) { + // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID + // that the Source System uses to refer to the data. + final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier(); + final String sourceFlowFileUUID; + final int lastColon = sourceIdentifier.lastIndexOf(":"); + if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) { + sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1); + } else { + sourceFlowFileUUID = null; + } + + if (sourceFlowFileUUID != null) { + addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID); + } + } + + return doc; + } + + return null; + } + + private static class UnIndexedLongField extends Field { + static final FieldType TYPE = new FieldType(); + static { + TYPE.setIndexed(false); + TYPE.setTokenized(true); + TYPE.setOmitNorms(true); + TYPE.setIndexOptions(IndexOptions.DOCS_ONLY); + TYPE.setNumericType(FieldType.NumericType.LONG); + TYPE.setStored(true); + TYPE.freeze(); + } + + public UnIndexedLongField(String name, long value) { + super(name, TYPE); + fieldsData = Long.valueOf(value); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/EventIndexTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/EventIndexTask.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/EventIndexTask.java new file mode 100644 index 0000000..f8bbd3b --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/EventIndexTask.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.lucene.document.Document; +import org.apache.lucene.search.NumericRangeQuery; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.index.EventIndexWriter; +import org.apache.nifi.provenance.lucene.IndexManager; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventIndexTask implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(EventIndexTask.class); + private static final String EVENT_CATEGORY = "Provenance Repository"; + public static final int MAX_DOCUMENTS_PER_THREAD = 100; + public static final int DEFAULT_MAX_EVENTS_PER_COMMIT = 1_000_000; + + private final BlockingQueue<StoredDocument> documentQueue; + private final IndexManager indexManager; + private volatile boolean shutdown = false; + + private final IndexDirectoryManager directoryManager; + private final EventReporter eventReporter; + private final int commitThreshold; + + public EventIndexTask(final BlockingQueue<StoredDocument> documentQueue, final RepositoryConfiguration repoConfig, final IndexManager indexManager, + final IndexDirectoryManager directoryManager, final int maxEventsPerCommit, final EventReporter eventReporter) { + this.documentQueue = documentQueue; + this.indexManager = indexManager; + this.directoryManager = directoryManager; + this.commitThreshold = maxEventsPerCommit; + this.eventReporter = eventReporter; + } + + public void shutdown() { + this.shutdown = true; + } + + private void fetchDocuments(final List<StoredDocument> destination) throws InterruptedException { + // We want to fetch up to INDEX_BUFFER_SIZE documents at a time. However, we don't want to continually + // call #drainTo on the queue. So we call poll, blocking for up to 1 second. If we get any event, then + // we will call drainTo to gather the rest. If we get no events, then we just return, having gathered + // no events. + StoredDocument firstDoc = documentQueue.poll(1, TimeUnit.SECONDS); + if (firstDoc == null) { + return; + } + + destination.add(firstDoc); + documentQueue.drainTo(destination, MAX_DOCUMENTS_PER_THREAD - 1); + } + + @Override + public void run() { + final List<StoredDocument> toIndex = new ArrayList<>(MAX_DOCUMENTS_PER_THREAD); + + while (!shutdown) { + try { + // Get the Documents that we want to index. + toIndex.clear(); + fetchDocuments(toIndex); + + if (toIndex.isEmpty()) { + continue; + } + + // Write documents to the currently active index. + final Map<String, List<StoredDocument>> docsByPartition = toIndex.stream() + .collect(Collectors.groupingBy(doc -> doc.getStorageSummary().getPartitionName().get())); + + for (final Map.Entry<String, List<StoredDocument>> entry : docsByPartition.entrySet()) { + final String partitionName = entry.getKey(); + final List<StoredDocument> docs = entry.getValue(); + + index(docs, partitionName); + } + } catch (final Exception e) { + logger.error("Failed to index Provenance Events", e); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to index Provenance Events. See logs for more information."); + } + } + } + + + /** + * Re-indexes the documents given. The IndexableDocument's provided are required to have the IndexDirectory provided. + */ + void reIndex(final List<IndexableDocument> toIndex, final CommitPreference commitPreference) throws IOException { + if (toIndex.isEmpty()) { + return; + } + + final Map<File, List<IndexableDocument>> docsByIndexDir = toIndex.stream().collect(Collectors.groupingBy(doc -> doc.getIndexDirectory())); + for (final Map.Entry<File, List<IndexableDocument>> entry : docsByIndexDir.entrySet()) { + final File indexDirectory = entry.getKey(); + final List<IndexableDocument> documentsForIndex = entry.getValue(); + + final EventIndexWriter indexWriter = indexManager.borrowIndexWriter(indexDirectory); + try { + // Remove any documents that already exist in this index that are overlapping. + long minId = Long.MAX_VALUE; + long maxId = Long.MIN_VALUE; + + for (final IndexableDocument doc : toIndex) { + final long eventId = doc.getDocument().getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); + if (eventId < minId) { + minId = eventId; + } + if (eventId > maxId) { + maxId = eventId; + } + } + + final NumericRangeQuery<Long> query = NumericRangeQuery.newLongRange( + SearchableFields.Identifier.getSearchableFieldName(), minId, maxId, true, true); + indexWriter.getIndexWriter().deleteDocuments(query); + + final List<Document> documents = documentsForIndex.stream() + .map(doc -> doc.getDocument()) + .collect(Collectors.toList()); + + indexWriter.index(documents, commitThreshold); + } finally { + indexManager.returnIndexWriter(indexWriter, CommitPreference.FORCE_COMMIT.equals(commitPreference), false); + } + } + } + + + private void index(final List<StoredDocument> toIndex, final String partitionName) throws IOException { + if (toIndex.isEmpty()) { + return; + } + + // Convert the IndexableDocument list into a List of Documents so that we can pass them to the Index Writer. + final List<Document> documents = toIndex.stream() + .map(doc -> doc.getDocument()) + .collect(Collectors.toList()); + + boolean requestClose = false; + boolean requestCommit = false; + + final long minEventTime = toIndex.stream() + .mapToLong(doc -> doc.getDocument().getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue()) + .min() + .getAsLong(); + + // Synchronize on the directory manager because we don't want the active directory to change + // while we are obtaining an index writer for it. I.e., determining the active directory + // and obtaining an Index Writer for it need to be done atomically. + final EventIndexWriter indexWriter; + final File indexDirectory; + synchronized (directoryManager) { + indexDirectory = directoryManager.getWritableIndexingDirectory(minEventTime, partitionName); + indexWriter = indexManager.borrowIndexWriter(indexDirectory); + } + + try { + // Perform the actual indexing. + boolean writerIndicatesCommit = indexWriter.index(documents, commitThreshold); + + // If we don't need to commit index based on what index writer tells us, we will still want + // to commit the index if it's assigned to a partition and this is no longer the active index + // for that partition. This prevents the following case: + // + // Thread T1: pulls events from queue + // Maps events to Index Directory D1 + // Thread T2: pulls events from queue + // Maps events to Index Directory D1, the active index for Partition P1. + // Writes events to D1. + // Commits Index Writer for D1. + // Closes Index Writer for D1. + // Thread T1: Writes events to D1. + // Determines that Index Writer for D1 does not need to be committed or closed. + // + // In the case outlined above, we would potentially lose those events from the index! To avoid this, + // we simply decide to commit the index if this writer is no longer the active writer for the index. + // However, if we have 10 threads, we don't want all 10 threads trying to commit the index after each + // update. We want to commit when they've all finished. This is what the IndexManager will do if we request + // that it commit the index. It will also close the index if requested, once all writers have finished. + // So when this is the case, we will request that the Index Manager both commit and close the writer. + + final Optional<File> activeIndexDirOption = directoryManager.getActiveIndexDirectory(partitionName); + if (!activeIndexDirOption.isPresent() || !activeIndexDirOption.get().equals(indexDirectory)) { + requestCommit = true; + requestClose = true; + } + + if (writerIndicatesCommit) { + commit(indexWriter); + requestCommit = false; // we've already committed the index writer so no need to request that the index manager do so also. + final boolean directoryManagerIndicatesClose = directoryManager.onIndexCommitted(indexDirectory); + requestClose = requestClose || directoryManagerIndicatesClose; + + if (logger.isDebugEnabled()) { + final long maxId = documents.stream() + .mapToLong(doc -> doc.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue()) + .max() + .orElse(-1L); + logger.debug("Committed index {} after writing a max Event ID of {}", indexDirectory, maxId); + } + } + } finally { + indexManager.returnIndexWriter(indexWriter, requestCommit, requestClose); + } + } + + + protected void commit(final EventIndexWriter indexWriter) throws IOException { + final long start = System.nanoTime(); + final long approximateCommitCount = indexWriter.commit(); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + logger.debug("Successfully committed approximately {} Events to {} in {} millis", approximateCommitCount, indexWriter, millis); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java new file mode 100644 index 0000000..09878ff --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.io.File; +import java.io.FileFilter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.util.DirectoryUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndexDirectoryManager { + private static final Logger logger = LoggerFactory.getLogger(IndexDirectoryManager.class); + private static final FileFilter INDEX_DIRECTORY_FILTER = f -> f.getName().startsWith("index-"); + private static final Pattern INDEX_FILENAME_PATTERN = Pattern.compile("index-(\\d+)"); + + private final RepositoryConfiguration repoConfig; + + // guarded by synchronizing on 'this' + private final SortedMap<Long, List<IndexLocation>> indexLocationByTimestamp = new TreeMap<>(); + private final Map<String, IndexLocation> activeIndices = new HashMap<>(); + + public IndexDirectoryManager(final RepositoryConfiguration repoConfig) { + this.repoConfig = repoConfig; + } + + public synchronized void initialize() { + final Map<File, Tuple<Long, IndexLocation>> latestIndexByStorageDir = new HashMap<>(); + + for (final Map.Entry<String, File> entry : repoConfig.getStorageDirectories().entrySet()) { + final String partitionName = entry.getKey(); + final File storageDir = entry.getValue(); + + final File[] indexDirs = storageDir.listFiles(INDEX_DIRECTORY_FILTER); + if (indexDirs == null) { + logger.warn("Unable to access Provenance Repository storage directory {}", storageDir); + continue; + } + + for (final File indexDir : indexDirs) { + final Matcher matcher = INDEX_FILENAME_PATTERN.matcher(indexDir.getName()); + if (!matcher.matches()) { + continue; + } + + final long startTime = DirectoryUtils.getIndexTimestamp(indexDir); + final List<IndexLocation> dirsForTimestamp = indexLocationByTimestamp.computeIfAbsent(startTime, t -> new ArrayList<>()); + final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName, repoConfig.getDesiredIndexSize()); + dirsForTimestamp.add(indexLoc); + + final Tuple<Long, IndexLocation> tuple = latestIndexByStorageDir.get(storageDir); + if (tuple == null || startTime > tuple.getKey()) { + latestIndexByStorageDir.put(storageDir, new Tuple<>(startTime, indexLoc)); + } + } + } + + // Restore the activeIndices to point at the newest index in each storage location. + for (final Tuple<Long, IndexLocation> tuple : latestIndexByStorageDir.values()) { + final IndexLocation indexLoc = tuple.getValue(); + activeIndices.put(indexLoc.getPartitionName(), indexLoc); + } + } + + + public synchronized void deleteDirectory(final File directory) { + final Iterator<Map.Entry<Long, List<IndexLocation>>> itr = indexLocationByTimestamp.entrySet().iterator(); + while (itr.hasNext()) { + final Map.Entry<Long, List<IndexLocation>> entry = itr.next(); + final List<IndexLocation> locations = entry.getValue(); + + final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory), + directory.getName(), repoConfig.getDesiredIndexSize()); + locations.remove(locToRemove); + if (locations.isEmpty()) { + itr.remove(); + } + } + } + + /** + * Returns a List of all indexes where the latest event in the index has an event time before the given timestamp + * + * @param timestamp the cutoff + * @return all Files that belong to an index, where the index has no events later than the given time + */ + public synchronized List<File> getDirectoriesBefore(final long timestamp) { + final List<File> selected = new ArrayList<>(); + + // An index cannot be expired if it is the latest index in the storage directory. As a result, we need to + // separate the indexes by Storage Directory so that we can easily determine if this is the case. + final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream() + .collect(Collectors.groupingBy(indexLoc -> indexLoc.getPartitionName())); + + // Scan through the index directories and the associated index event start time. + // If looking at index N, we can determine the index end time by assuming that it is the same as the + // start time of index N+1. So we determine the time range of each index and select an index only if + // its start time is before the given timestamp and its end time is <= the given timestamp. + for (final List<IndexLocation> startTimeWithFile : startTimeWithFileByStorageDirectory.values()) { + for (int i = 0; i < startTimeWithFile.size(); i++) { + final IndexLocation indexLoc = startTimeWithFile.get(i); + + final String partition = indexLoc.getPartitionName(); + final IndexLocation activeLocation = activeIndices.get(partition); + if (indexLoc.equals(activeLocation)) { + continue; + } + + final Long indexStartTime = indexLoc.getIndexStartTimestamp(); + if (indexStartTime > timestamp) { + // If the first timestamp in the index is later than the desired timestamp, + // then we are done. We can do this because the list is ordered by monotonically + // increasing timestamp as the Tuple key. + break; + } + + if (i < startTimeWithFile.size() - 1) { + final IndexLocation nextLocation = startTimeWithFile.get(i + 1); + final Long indexEndTime = nextLocation.getIndexStartTimestamp(); + if (indexEndTime <= timestamp) { + logger.debug("Considering Index Location {} older than {} ({}) because its events have an EventTime " + + "ranging from {} ({}) to {} ({}) based on the following IndexLocations: {}", nextLocation, timestamp, new Date(timestamp), + indexStartTime, new Date(indexStartTime), indexEndTime, new Date(indexEndTime), startTimeWithFile); + + selected.add(nextLocation.getIndexDirectory()); + } + } + } + } + + logger.debug("Returning the following list of index locations because they were finished being written to before {}: {}", timestamp, selected); + return selected; + } + + /** + * Convert directoriesByTimestamp to a List of IndexLocations. + * This allows us to easily get the 'next' value when iterating over the elements. + * This is useful because we know that the 'next' value will have a timestamp that is when that + * file started being written to - which is the same as when this index stopped being written to. + * + * @return a List of all IndexLocations known + */ + private List<IndexLocation> flattenDirectoriesByTimestamp() { + final List<IndexLocation> startTimeWithFile = new ArrayList<>(); + for (final Map.Entry<Long, List<IndexLocation>> entry : indexLocationByTimestamp.entrySet()) { + for (final IndexLocation indexLoc : entry.getValue()) { + startTimeWithFile.add(indexLoc); + } + } + + return startTimeWithFile; + } + + public synchronized List<File> getDirectories(final Long startTime, final Long endTime) { + final List<File> selected = new ArrayList<>(); + + // An index cannot be expired if it is the latest index in the partition. As a result, we need to + // separate the indexes by partition so that we can easily determine if this is the case. + final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream() + .collect(Collectors.groupingBy(indexLoc -> indexLoc.getPartitionName())); + + for (final List<IndexLocation> locationList : startTimeWithFileByStorageDirectory.values()) { + selected.addAll(getDirectories(startTime, endTime, locationList)); + } + + return selected; + } + + public synchronized List<File> getDirectories(final Long startTime, final Long endTime, final String partitionName) { + // An index cannot be expired if it is the latest index in the partition. As a result, we need to + // separate the indexes by partition so that we can easily determine if this is the case. + final Map<String, List<IndexLocation>> startTimeWithFileByStorageDirectory = flattenDirectoriesByTimestamp().stream() + .collect(Collectors.groupingBy(indexLoc -> indexLoc.getPartitionName())); + + final List<IndexLocation> indexLocations = startTimeWithFileByStorageDirectory.get(partitionName); + if (indexLocations == null) { + return Collections.emptyList(); + } + + return getDirectories(startTime, endTime, indexLocations); + } + + protected static List<File> getDirectories(final Long startTime, final Long endTime, final List<IndexLocation> locations) { + final List<File> selected = new ArrayList<>(); + + int overlapCount = 0; + for (int i = 0; i < locations.size(); i++) { + final IndexLocation indexLoc = locations.get(i); + final Long indexStartTimestamp = indexLoc.getIndexStartTimestamp(); + if (endTime != null && indexStartTimestamp > endTime) { + if (overlapCount == 0) { + // Because of how we handle index timestamps and the multi-threading, it is possible + // the we could have some overlap where Thread T1 gets an Event with start time 1,000 + // for instance. Then T2 gets and Event with start time 1,002 and ends up creating a + // new index directory with a start time of 1,002. Then T1 could end up writing events + // with timestamp 1,000 to an index with a 'start time' of 1,002. Because of this, + // the index start times are approximate. To address this, we include one extra Index + // Directory based on start time, so that if we want index directories for Time Range + // 1,000 - 1,001 and have indexes 999 and 1,002 we will include the 999 and the 'overlapping' + // directory of 1,002 since it could potentially have an event with overlapping timestamp. + overlapCount++; + } else { + continue; + } + } + + if (startTime != null) { + final Long indexEndTimestamp; + if (i < locations.size() - 1) { + final IndexLocation nextIndexLoc = locations.get(i + 1); + indexEndTimestamp = nextIndexLoc.getIndexStartTimestamp(); + if (indexEndTimestamp < startTime) { + continue; + } + } + } + + selected.add(indexLoc.getIndexDirectory()); + } + + return selected; + } + + /** + * Notifies the Index Directory Manager that an Index Writer has been committed for the + * given index directory. This allows the Directory Manager to know that it needs to check + * the size of the index directory and not return this directory as a writable directory + * any more if the size has reached the configured threshold. + * + * @param indexDir the directory that was written to + * @return <code>true</code> if the index directory has reached its max threshold and should no + * longer be written to, <code>false</code> if the index directory is not full. + */ + public boolean onIndexCommitted(final File indexDir) { + final long indexSize = getSize(indexDir); + synchronized (this) { + String partitionName = null; + for (final Map.Entry<String, IndexLocation> entry : activeIndices.entrySet()) { + if (indexDir.equals(entry.getValue().getIndexDirectory())) { + partitionName = entry.getKey(); + break; + } + } + + // If the index is not the active index directory, it should no longer be written to. + if (partitionName == null) { + logger.debug("Size of Provenance Index at {} is now {}. However, was unable to find the appropriate Active Index to roll over.", indexDir, indexSize); + return true; + } + + // If the index size >= desired index size, it should no longer be written to. + if (indexSize >= repoConfig.getDesiredIndexSize()) { + logger.info("Size of Provenance Index at {} is now {}. Will close this index and roll over to a new one.", indexDir, indexSize); + activeIndices.remove(partitionName); + + return true; + } + + // Index directory is the active index directory and has not yet exceeded the desired size. + return false; + } + } + + public synchronized Optional<File> getActiveIndexDirectory(final String partitionName) { + final IndexLocation indexLocation = activeIndices.get(partitionName); + if (indexLocation == null) { + return Optional.empty(); + } + + return Optional.of(indexLocation.getIndexDirectory()); + } + + private long getSize(final File indexDir) { + if (!indexDir.exists()) { + return 0L; + } + if (!indexDir.isDirectory()) { + throw new IllegalArgumentException("Must specify a directory but specified " + indexDir); + } + + // List all files in the Index Directory. + final File[] files = indexDir.listFiles(); + if (files == null) { + return 0L; + } + + long sum = 0L; + for (final File file : files) { + sum += file.length(); + } + + return sum; + } + + /** + * Provides the File that is the directory for the index that should be written to. If there is no index yet + * to be written to, or if the index has reached its max size, a new one will be created. The given {@code earliestTimestamp} + * should represent the event time of the first event that will go into the index. This is used for file naming purposes so + * that the appropriate directories can be looked up quickly later. + * + * @param earliestTimestamp the event time of the first event that will go into a new index, if a new index is created by this call. + * @param partitionName the name of the partition to write to + * @return the directory that should be written to + */ + public synchronized File getWritableIndexingDirectory(final long earliestTimestamp, final String partitionName) { + IndexLocation indexLoc = activeIndices.get(partitionName); + if (indexLoc == null || indexLoc.isIndexFull()) { + indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName, repoConfig.getDesiredIndexSize()); + logger.debug("Created new Index Directory {}", indexLoc); + + indexLocationByTimestamp.computeIfAbsent(earliestTimestamp, t -> new ArrayList<>()).add(indexLoc); + activeIndices.put(partitionName, indexLoc); + } + + return indexLoc.getIndexDirectory(); + } + + private File createIndex(final long earliestTimestamp, final String partitionName) { + final File storageDir = repoConfig.getStorageDirectories().entrySet().stream() + .filter(e -> e.getKey().equals(partitionName)) + .map(Map.Entry::getValue) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Invalid Partition: " + partitionName)); + final File indexDir = new File(storageDir, "index-" + earliestTimestamp); + + return indexDir; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java new file mode 100644 index 0000000..33867c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.provenance.util.DirectoryUtils; + +public class IndexLocation { + private static final long SIZE_CHECK_MILLIS = TimeUnit.SECONDS.toMillis(30L); + + private final File indexDirectory; + private final long indexStartTimestamp; + private final String partitionName; + private final long desiredIndexSize; + private volatile long lastSizeCheckTime = System.currentTimeMillis(); + + public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName, final long desiredIndexSize) { + this.indexDirectory = indexDirectory; + this.indexStartTimestamp = indexStartTimestamp; + this.partitionName = partitionName; + this.desiredIndexSize = desiredIndexSize; + } + + public File getIndexDirectory() { + return indexDirectory; + } + + public long getIndexStartTimestamp() { + return indexStartTimestamp; + } + + public String getPartitionName() { + return partitionName; + } + + public boolean isIndexFull() { + final long now = System.currentTimeMillis(); + final long millisSinceLastSizeCheck = now - lastSizeCheckTime; + if (millisSinceLastSizeCheck < SIZE_CHECK_MILLIS) { + return false; + } + + lastSizeCheckTime = now; + return DirectoryUtils.getSize(indexDirectory) >= desiredIndexSize; + } + + @Override + public int hashCode() { + return 31 + 41 * indexDirectory.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + + if (!(obj instanceof IndexLocation)) { + return false; + } + + final IndexLocation other = (IndexLocation) obj; + return indexDirectory.equals(other.getIndexDirectory()); + } + + @Override + public String toString() { + return "IndexLocation[directory=" + indexDirectory + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexableDocument.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexableDocument.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexableDocument.java new file mode 100644 index 0000000..1fc163f --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexableDocument.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.io.File; + +import org.apache.lucene.document.Document; +import org.apache.nifi.provenance.serialization.StorageSummary; + +public class IndexableDocument { + private final Document document; + private final StorageSummary persistenceLocation; + private final File indexDirectory; + + public IndexableDocument(final Document document, final StorageSummary location, final File indexDirectory) { + this.document = document; + this.persistenceLocation = location; + this.indexDirectory = indexDirectory; + } + + public Document getDocument() { + return document; + } + + public StorageSummary getPersistenceLocation() { + return persistenceLocation; + } + + public File getIndexDirectory() { + return indexDirectory; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java new file mode 100644 index 0000000..73b0a14 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.SearchTerm; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.util.RingBuffer; + +public class LatestEventsPerProcessorQuery implements CachedQuery { + private static final String COMPONENT_ID_FIELD_NAME = SearchableFields.ComponentID.getSearchableFieldName(); + private final ConcurrentMap<String, RingBuffer<Long>> latestRecords = new ConcurrentHashMap<>(); + + @Override + public void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) { + final String componentId = event.getComponentId(); + final RingBuffer<Long> ringBuffer = latestRecords.computeIfAbsent(componentId, id -> new RingBuffer<>(1000)); + ringBuffer.add(storageSummary.getEventId()); + } + + @Override + public Optional<List<Long>> evaluate(final Query query) { + if (query.getMaxResults() > 1000) { + // If query max results > 1000 then we know we don't have enough results. So just return empty. + return Optional.empty(); + } + + final List<SearchTerm> terms = query.getSearchTerms(); + if (terms.size() != 1) { + return Optional.empty(); + } + + final SearchTerm term = terms.get(0); + if (!COMPONENT_ID_FIELD_NAME.equals(term.getSearchableField().getSearchableFieldName())) { + return Optional.empty(); + } + + if (query.getEndDate() != null || query.getStartDate() != null) { + return Optional.empty(); + } + + final RingBuffer<Long> ringBuffer = latestRecords.get(term.getValue()); + if (ringBuffer == null || ringBuffer.getSize() < query.getMaxResults()) { + return Optional.empty(); + } + + List<Long> eventIds = ringBuffer.asList(); + if (eventIds.size() > query.getMaxResults()) { + eventIds = eventIds.subList(0, query.getMaxResults()); + } + + return Optional.of(eventIds); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java new file mode 100644 index 0000000..94cd013 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.util.List; +import java.util.Optional; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.util.RingBuffer; + +public class LatestEventsQuery implements CachedQuery { + + final RingBuffer<Long> latestRecords = new RingBuffer<>(1000); + + @Override + public void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) { + latestRecords.add(storageSummary.getEventId()); + } + + @Override + public Optional<List<Long>> evaluate(final Query query) { + if (latestRecords.getSize() < query.getMaxResults()) { + return Optional.empty(); + } + + if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) { + final List<Long> eventList = latestRecords.asList(); + if (eventList.size() > query.getMaxResults()) { + return Optional.of(eventList.subList(0, query.getMaxResults())); + } else { + return Optional.of(eventList); + } + } else { + return Optional.empty(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java new file mode 100644 index 0000000..15b11b4 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.lucene.IndexManager; +import org.apache.nifi.provenance.util.DirectoryUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LuceneCacheWarmer implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(LuceneCacheWarmer.class); + + private final File storageDir; + private final IndexManager indexManager; + + public LuceneCacheWarmer(final File storageDir, final IndexManager indexManager) { + this.storageDir = storageDir; + this.indexManager = indexManager; + } + + @Override + public void run() { + try { + final File[] indexDirs = storageDir.listFiles(DirectoryUtils.INDEX_FILE_FILTER); + if (indexDirs == null) { + logger.info("Cannot warm Lucene Index Cache for " + storageDir + " because the directory could not be read"); + return; + } + + logger.info("Beginning warming of Lucene Index Cache for " + storageDir); + final long startNanos = System.nanoTime(); + for (final File indexDir : indexDirs) { + final long indexStartNanos = System.nanoTime(); + + final EventIndexSearcher eventSearcher = indexManager.borrowIndexSearcher(indexDir); + indexManager.returnIndexSearcher(eventSearcher); + + final long indexWarmMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStartNanos); + logger.debug("Took {} ms to warm Lucene Index {}", indexWarmMillis, indexDir); + } + + final long warmSecs = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startNanos); + logger.info("Finished warming all Lucene Indexes for {} in {} seconds", storageDir, warmSecs); + } catch (final Exception e) { + logger.error("Failed to warm Lucene Index Cache for " + storageDir, e); + } + } +}