http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java new file mode 100644 index 0000000..a583403 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java @@ -0,0 +1,737 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.AsyncLineageSubmission; +import org.apache.nifi.provenance.AsyncQuerySubmission; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StandardLineageResult; +import org.apache.nifi.provenance.StandardQueryResult; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.authorization.EventTransformer; +import org.apache.nifi.provenance.index.EventIndex; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.index.EventIndexWriter; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.lineage.LineageComputationType; +import org.apache.nifi.provenance.lucene.IndexManager; +import org.apache.nifi.provenance.lucene.LuceneUtil; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.provenance.store.EventStore; +import org.apache.nifi.provenance.util.DirectoryUtils; +import org.apache.nifi.provenance.util.NamedThreadFactory; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.file.FileUtils; +import org.apache.nifi.util.timebuffer.LongEntityAccess; +import org.apache.nifi.util.timebuffer.TimedBuffer; +import org.apache.nifi.util.timebuffer.TimestampedLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LuceneEventIndex implements EventIndex { + private static final Logger logger = LoggerFactory.getLogger(LuceneEventIndex.class); + private static final String EVENT_CATEGORY = "Provenance Repository"; + + public static final int MAX_UNDELETED_QUERY_RESULTS = 10; + public static final int MAX_DELETE_INDEX_WAIT_SECONDS = 30; + public static final int MAX_LINEAGE_NODES = 1000; + public static final int MAX_INDEX_THREADS = 100; + + private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>(); + private final BlockingQueue<StoredDocument> documentQueue = new LinkedBlockingQueue<>(1000); + private final List<EventIndexTask> indexTasks = Collections.synchronizedList(new ArrayList<>()); + private final ExecutorService queryExecutor; + private final ExecutorService indexExecutor; + private final RepositoryConfiguration config; + private final IndexManager indexManager; + private final ConvertEventToLuceneDocument eventConverter; + private final IndexDirectoryManager directoryManager; + private volatile boolean closed = false; + + private final TimedBuffer<TimestampedLong> queuePauseNanos = new TimedBuffer<>(TimeUnit.SECONDS, 300, new LongEntityAccess()); + private final TimedBuffer<TimestampedLong> eventsIndexed = new TimedBuffer<>(TimeUnit.SECONDS, 300, new LongEntityAccess()); + private final AtomicLong eventCount = new AtomicLong(0L); + private final EventReporter eventReporter; + + private final List<CachedQuery> cachedQueries = new ArrayList<>(); + + private ScheduledExecutorService maintenanceExecutor; // effectively final + private ScheduledExecutorService cacheWarmerExecutor; + private EventStore eventStore; + + public LuceneEventIndex(final RepositoryConfiguration config, final IndexManager indexManager, final EventReporter eventReporter) { + this(config, indexManager, EventIndexTask.DEFAULT_MAX_EVENTS_PER_COMMIT, eventReporter); + } + + public LuceneEventIndex(final RepositoryConfiguration config, final IndexManager indexManager, final int maxEventsPerCommit, final EventReporter eventReporter) { + this.eventReporter = eventReporter; + queryExecutor = Executors.newFixedThreadPool(config.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query")); + indexExecutor = Executors.newFixedThreadPool(config.getIndexThreadPoolSize(), new NamedThreadFactory("Index Provenance Events")); + cacheWarmerExecutor = Executors.newScheduledThreadPool(config.getStorageDirectories().size(), new NamedThreadFactory("Warm Lucene Index", true)); + directoryManager = new IndexDirectoryManager(config); + + // Limit number of indexing threads to 100. When we restore the repository on restart, + // we have to re-index up to MAX_THREADS * MAX_DOCUMENTS_PER_THREADS events prior to + // the last event that the index holds. This is done because we could have that many + // events 'in flight', waiting to be indexed when the last index writer was committed, + // so even though the index says the largest event ID is 1,000,000 for instance, Event + // with ID 999,999 may still not have been indexed because another thread was in the + // process of writing the event to the index. + final int configuredIndexPoolSize = config.getIndexThreadPoolSize(); + final int numIndexThreads; + if (configuredIndexPoolSize > MAX_INDEX_THREADS) { + logger.warn("The Provenance Repository is configured to perform indexing of events using {} threads. This number exceeds the maximum allowable number of threads, which is {}. " + + "Will proceed using {} threads. This value is limited because the performance of indexing will decrease and startup times will increase when setting this value too high.", + configuredIndexPoolSize, MAX_INDEX_THREADS, MAX_INDEX_THREADS); + numIndexThreads = MAX_INDEX_THREADS; + } else { + numIndexThreads = configuredIndexPoolSize; + } + + for (int i = 0; i < numIndexThreads; i++) { + final EventIndexTask task = new EventIndexTask(documentQueue, config, indexManager, directoryManager, maxEventsPerCommit, eventReporter); + indexTasks.add(task); + indexExecutor.submit(task); + } + this.config = config; + this.indexManager = indexManager; + this.eventConverter = new ConvertEventToLuceneDocument(config.getSearchableFields(), config.getSearchableAttributes()); + } + + @Override + public void initialize(final EventStore eventStore) { + this.eventStore = eventStore; + directoryManager.initialize(); + + maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance")); + maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES); + maintenanceExecutor.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30, 30, TimeUnit.SECONDS); + + cachedQueries.add(new LatestEventsQuery()); + cachedQueries.add(new LatestEventsPerProcessorQuery()); + + final Optional<Integer> warmCacheMinutesOption = config.getWarmCacheFrequencyMinutes(); + if (warmCacheMinutesOption.isPresent() && warmCacheMinutesOption.get() > 0) { + for (final File storageDir : config.getStorageDirectories().values()) { + final int minutes = warmCacheMinutesOption.get(); + cacheWarmerExecutor.scheduleWithFixedDelay(new LuceneCacheWarmer(storageDir, indexManager), 1, minutes, TimeUnit.MINUTES); + } + } + } + + @Override + public long getMinimumEventIdToReindex(final String partitionName) { + return Math.max(0, getMaxEventId(partitionName) - EventIndexTask.MAX_DOCUMENTS_PER_THREAD * LuceneEventIndex.MAX_INDEX_THREADS); + } + + protected IndexDirectoryManager getDirectoryManager() { + return directoryManager; + } + + @Override + public void close() throws IOException { + closed = true; + queryExecutor.shutdownNow(); + indexExecutor.shutdown(); + cacheWarmerExecutor.shutdown(); + + if (maintenanceExecutor != null) { + maintenanceExecutor.shutdown(); + } + + for (final EventIndexTask task : indexTasks) { + task.shutdown(); + } + } + + long getMaxEventId(final String partitionName) { + final List<File> allDirectories = getDirectoryManager().getDirectories(0L, Long.MAX_VALUE, partitionName); + if (allDirectories.isEmpty()) { + return -1L; + } + + Collections.sort(allDirectories, DirectoryUtils.NEWEST_INDEX_FIRST); + + for (final File directory : allDirectories) { + final EventIndexSearcher searcher; + try { + searcher = indexManager.borrowIndexSearcher(directory); + } catch (final IOException ioe) { + logger.warn("Unable to read from Index Directory {}. Will assume that the index is incomplete and not consider this index when determining max event ID", directory); + continue; + } + + try { + final IndexReader reader = searcher.getIndexSearcher().getIndexReader(); + final int maxDocId = reader.maxDoc() - 1; + final Document document = reader.document(maxDocId); + final long eventId = document.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue(); + logger.info("Determined that Max Event ID indexed for Partition {} is approximately {} based on index {}", partitionName, eventId, directory); + return eventId; + } catch (final IOException ioe) { + logger.warn("Unable to search Index Directory {}. Will assume that the index is incomplete and not consider this index when determining max event ID", directory, ioe); + } finally { + indexManager.returnIndexSearcher(searcher); + } + } + + return -1L; + } + + @Override + public void reindexEvents(final Map<ProvenanceEventRecord, StorageSummary> events) { + final EventIndexTask indexTask = new EventIndexTask(documentQueue, config, indexManager, directoryManager, EventIndexTask.DEFAULT_MAX_EVENTS_PER_COMMIT, eventReporter); + + File lastIndexDir = null; + long lastEventTime = -2L; + + final List<IndexableDocument> indexableDocs = new ArrayList<>(events.size()); + for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) { + final ProvenanceEventRecord event = entry.getKey(); + final StorageSummary summary = entry.getValue(); + + for (final CachedQuery cachedQuery : cachedQueries) { + cachedQuery.update(event, summary); + } + + final Document document = eventConverter.convert(event, summary); + if (document == null) { + logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event); + } else { + final File indexDir; + if (event.getEventTime() == lastEventTime) { + indexDir = lastIndexDir; + } else { + final List<File> files = getDirectoryManager().getDirectories(event.getEventTime(), null); + indexDir = files.isEmpty() ? null : files.get(0); + lastIndexDir = indexDir; + } + + final IndexableDocument doc = new IndexableDocument(document, summary, indexDir); + indexableDocs.add(doc); + } + } + + try { + indexTask.reIndex(indexableDocs, CommitPreference.PREVENT_COMMIT); + } catch (final IOException ioe) { + logger.error("Failed to reindex some Provenance Events", ioe); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to re-index some Provenance Events. " + + "Some Provenance Events may not be available for querying. See logs for more information."); + } + } + + @Override + public void commitChanges(final String partitionName) throws IOException { + final Optional<File> indexDir = directoryManager.getActiveIndexDirectory(partitionName); + if (indexDir.isPresent()) { + final EventIndexWriter eventIndexWriter = indexManager.borrowIndexWriter(indexDir.get()); + try { + eventIndexWriter.commit(); + } finally { + indexManager.returnIndexWriter(eventIndexWriter, false, false); + } + } + } + + protected void addEvent(final ProvenanceEventRecord event, final StorageSummary location) { + for (final CachedQuery cachedQuery : cachedQueries) { + cachedQuery.update(event, location); + } + + final Document document = eventConverter.convert(event, location); + if (document == null) { + logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event); + } else { + final StoredDocument doc = new StoredDocument(document, location); + boolean added = false; + while (!added && !closed) { + + added = documentQueue.offer(doc); + if (!added) { + final long start = System.nanoTime(); + try { + added = documentQueue.offer(doc, 1, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while attempting to enqueue Provenance Event for indexing; this event will not be indexed"); + return; + } + final long nanos = System.nanoTime() - start; + queuePauseNanos.add(new TimestampedLong(nanos)); + } + + if (added) { + final long totalEventCount = eventCount.incrementAndGet(); + if (totalEventCount % 1_000_000 == 0 && logger.isDebugEnabled()) { + incrementAndReportStats(); + } + } + } + } + } + + private void incrementAndReportStats() { + final long fiveMinutesAgo = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5); + final TimestampedLong nanosLastFive = queuePauseNanos.getAggregateValue(fiveMinutesAgo); + if (nanosLastFive == null) { + return; + } + + final TimestampedLong eventsLast5 = eventsIndexed.getAggregateValue(fiveMinutesAgo); + if (eventsLast5 == null) { + return; + } + + final long numEventsLast5 = eventsLast5.getValue(); + + final long millis = TimeUnit.NANOSECONDS.toMillis(nanosLastFive.getValue()); + logger.debug("In the last 5 minutes, have spent {} CPU-millis waiting to enqueue events for indexing and have indexed {} events ({} since NiFi started)", + millis, numEventsLast5, eventCount.get()); + } + + @Override + public void addEvents(final Map<ProvenanceEventRecord, StorageSummary> events) { + eventsIndexed.add(new TimestampedLong((long) events.size())); + + for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) { + addEvent(entry.getKey(), entry.getValue()); + } + } + + + @Override + public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user, final EventAuthorizer eventAuthorizer) { + final Optional<ProvenanceEventRecord> eventOption; + try { + eventOption = eventStore.getEvent(eventId); + } catch (final Exception e) { + logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e); + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity()); + result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information."); + return result; + } + + if (!eventOption.isPresent()) { + final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity()); + result.getResult().setError("Could not find Provenance Event with ID " + eventId); + lineageSubmissionMap.put(result.getLineageIdentifier(), result); + return result; + } + + final ProvenanceEventRecord event = eventOption.get(); + return submitLineageComputation(Collections.singleton(event.getFlowFileUuid()), user, eventAuthorizer, LineageComputationType.FLOWFILE_LINEAGE, + eventId, event.getLineageStartDate(), Long.MAX_VALUE); + } + + + private ComputeLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final NiFiUser user, final EventAuthorizer eventAuthorizer, + final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) { + + final List<File> indexDirs = directoryManager.getDirectories(startTimestamp, endTimestamp); + final AsyncLineageSubmission submission = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size(), user.getIdentity()); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + + final BooleanQuery lineageQuery = buildLineageQuery(flowFileUuids); + final List<File> indexDirectories = directoryManager.getDirectories(startTimestamp, endTimestamp); + if (indexDirectories.isEmpty()) { + submission.getResult().update(Collections.emptyList(), 0L); + } else { + Collections.sort(indexDirectories, DirectoryUtils.OLDEST_INDEX_FIRST); + + for (final File indexDir : indexDirectories) { + queryExecutor.submit(new QueryTask(lineageQuery, submission.getResult(), MAX_LINEAGE_NODES, indexManager, indexDir, + eventStore, eventAuthorizer, EventTransformer.PLACEHOLDER_TRANSFORMER)); + } + } + + // Some computations will complete very quickly. In this case, we don't want to wait + // for the client to submit a second query to obtain the result. Instead, we want to just + // wait some short period of time for the computation to complete before returning the submission. + try { + submission.getResult().awaitCompletion(500, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + } + + return submission; + } + + private BooleanQuery buildLineageQuery(final Collection<String> flowFileUuids) { + // Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as + // "SHOULD" clauses and then setting the minimum required to 1. + final BooleanQuery lineageQuery; + if (flowFileUuids == null || flowFileUuids.isEmpty()) { + lineageQuery = null; + } else { + lineageQuery = new BooleanQuery(); + for (final String flowFileUuid : flowFileUuids) { + lineageQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD); + } + lineageQuery.setMinimumNumberShouldMatch(1); + } + + return lineageQuery; + } + + @Override + public QuerySubmission submitQuery(final Query query, final EventAuthorizer authorizer, final String userId) { + validate(query); + + // Check if we have any cached queries first that can give us the answer + for (final CachedQuery cachedQuery : cachedQueries) { + final Optional<List<Long>> eventIdListOption = cachedQuery.evaluate(query); + if (eventIdListOption.isPresent()) { + final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, 1, userId); + querySubmissionMap.put(query.getIdentifier(), submission); + + final List<Long> eventIds = eventIdListOption.get(); + + queryExecutor.submit(() -> { + List<ProvenanceEventRecord> events; + try { + events = eventStore.getEvents(eventIds, authorizer, EventTransformer.EMPTY_TRANSFORMER); + submission.getResult().update(events, eventIds.size()); + } catch (final Exception e) { + submission.getResult().setError("Failed to retrieve Provenance Events from store; see logs for more details"); + logger.error("Failed to retrieve Provenance Events from store", e); + } + }); + + // There are some queries that are optimized and will complete very quickly. As a result, + // we don't want to wait for the client to issue a second request, so we will give the query + // up to 500 milliseconds to complete before running. + try { + submission.getResult().awaitCompletion(500, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return submission; + } + } + + final List<File> indexDirectories = directoryManager.getDirectories( + query.getStartDate() == null ? null : query.getStartDate().getTime(), + query.getEndDate() == null ? null : query.getEndDate().getTime()); + + final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, indexDirectories.size(), userId); + querySubmissionMap.put(query.getIdentifier(), submission); + + final org.apache.lucene.search.Query luceneQuery = LuceneUtil.convertQuery(query); + logger.debug("Submitting query {} with identifier {} against index directories {}", luceneQuery, query.getIdentifier(), indexDirectories); + + if (indexDirectories.isEmpty()) { + submission.getResult().update(Collections.emptyList(), 0L); + } else { + Collections.sort(indexDirectories, DirectoryUtils.NEWEST_INDEX_FIRST); + + for (final File indexDir : indexDirectories) { + queryExecutor.submit(new QueryTask(luceneQuery, submission.getResult(), query.getMaxResults(), indexManager, indexDir, + eventStore, authorizer, EventTransformer.EMPTY_TRANSFORMER)); + } + } + + // There are some queries that are optimized and will complete very quickly. As a result, + // we don't want to wait for the client to issue a second request, so we will give the query + // up to 500 milliseconds to complete before running. + try { + submission.getResult().awaitCompletion(500, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return submission; + } + + + @Override + public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user, final EventAuthorizer eventAuthorizer) { + return submitLineageComputation(Collections.singleton(flowFileUuid), user, eventAuthorizer, LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE); + } + + @Override + public ComputeLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user, final EventAuthorizer authorizer) { + final String userId = user.getIdentity(); + + try { + final Optional<ProvenanceEventRecord> eventOption = eventStore.getEvent(eventId); + if (!eventOption.isPresent()) { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1, userId); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().update(Collections.emptyList(), 0L); + return submission; + } + + final ProvenanceEventRecord event = eventOption.get(); + switch (event.getEventType()) { + case CLONE: + case FORK: + case JOIN: + case REPLAY: { + return submitLineageComputation(event.getChildUuids(), user, authorizer, LineageComputationType.EXPAND_CHILDREN, + eventId, event.getEventTime(), Long.MAX_VALUE); + } + default: { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, + eventId, Collections.<String> emptyList(), 1, userId); + + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); + return submission; + } + } + } catch (final Exception e) { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, + eventId, Collections.<String> emptyList(), 1, userId); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Failed to expand children for lineage of event with ID " + eventId + " due to: " + e); + return submission; + } + } + + @Override + public ComputeLineageSubmission submitExpandParents(final long eventId, final NiFiUser user, final EventAuthorizer authorizer) { + final String userId = user.getIdentity(); + + try { + final Optional<ProvenanceEventRecord> eventOption = eventStore.getEvent(eventId); + if (!eventOption.isPresent()) { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1, userId); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().update(Collections.emptyList(), 0L); + return submission; + } + + final ProvenanceEventRecord event = eventOption.get(); + switch (event.getEventType()) { + case JOIN: + case FORK: + case CLONE: + case REPLAY: { + return submitLineageComputation(event.getParentUuids(), user, authorizer, LineageComputationType.EXPAND_PARENTS, + eventId, event.getLineageStartDate(), event.getEventTime()); + } + default: { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, + eventId, Collections.<String> emptyList(), 1, userId); + + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); + return submission; + } + } + } catch (final Exception e) { + final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, + eventId, Collections.<String> emptyList(), 1, userId); + lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); + + submission.getResult().setError("Failed to expand parents for lineage of event with ID " + eventId + " due to: " + e); + return submission; + } + } + + @Override + public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier, final NiFiUser user) { + final AsyncLineageSubmission submission = lineageSubmissionMap.get(lineageIdentifier); + final String userId = submission.getSubmitterIdentity(); + + if (user == null && userId == null) { + return submission; + } + + if (user == null) { + throw new AccessDeniedException("Cannot retrieve Provenance Lineage Submission because no user id was provided"); + } + + if (userId == null || userId.equals(user.getIdentity())) { + return submission; + } + + throw new AccessDeniedException("Cannot retrieve Provenance Lineage Submission because " + user.getIdentity() + " is not the user who submitted the request"); + } + + @Override + public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) { + final QuerySubmission submission = querySubmissionMap.get(queryIdentifier); + + final String userId = submission.getSubmitterIdentity(); + + if (user == null && userId == null) { + return submission; + } + + if (user == null) { + throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because no user id was provided"); + } + + if (userId == null || userId.equals(user.getIdentity())) { + return submission; + } + + throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because " + user.getIdentity() + " is not the user who submitted the request"); + } + + @Override + public long getSize() { + long total = 0; + for (final File file : directoryManager.getDirectories(null, null)) { + total += DirectoryUtils.getSize(file); + } + return total; + } + + private void validate(final Query query) { + final int numQueries = querySubmissionMap.size(); + if (numQueries > MAX_UNDELETED_QUERY_RESULTS) { + throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not " + + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later."); + } + + if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) { + throw new IllegalArgumentException("Query End Time cannot be before Query Start Time"); + } + } + + void performMaintenance() { + try { + final List<ProvenanceEventRecord> firstEvents = eventStore.getEvents(0, 1); + if (firstEvents.isEmpty()) { + return; + } + + final ProvenanceEventRecord firstEvent = firstEvents.get(0); + final long earliestEventTime = firstEvent.getEventTime(); + logger.debug("First Event Time is {} ({}) with Event ID {}; will delete any Lucene Index that is older than this", + earliestEventTime, new Date(earliestEventTime), firstEvent.getEventId()); + final List<File> indicesBeforeEarliestEvent = directoryManager.getDirectoriesBefore(earliestEventTime); + + for (final File index : indicesBeforeEarliestEvent) { + logger.debug("Index directory {} is now expired. Attempting to remove index", index); + tryDeleteIndex(index); + } + } catch (final Exception e) { + logger.error("Failed to perform background maintenance procedures", e); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to perform maintenance of Provenance Repository. See logs for more information."); + } + } + + protected boolean tryDeleteIndex(final File indexDirectory) { + final long startNanos = System.nanoTime(); + boolean removed = false; + while (!removed && System.nanoTime() - startNanos < TimeUnit.SECONDS.toNanos(MAX_DELETE_INDEX_WAIT_SECONDS)) { + removed = indexManager.removeIndex(indexDirectory); + + if (!removed) { + try { + Thread.sleep(5000L); + } catch (final InterruptedException ie) { + logger.debug("Interrupted when trying to remove index {} from IndexManager; will not remove index", indexDirectory); + Thread.currentThread().interrupt(); + return false; + } + } + } + + if (removed) { + try { + FileUtils.deleteFile(indexDirectory, true); + logger.debug("Successfully deleted directory {}", indexDirectory); + } catch (final IOException e) { + logger.warn("The Lucene Index located at " + indexDirectory + " has expired and contains no Provenance Events that still exist in the respository. " + + "However, the directory could not be deleted.", e); + } + + directoryManager.deleteDirectory(indexDirectory); + logger.info("Successfully removed expired Lucene Index {}", indexDirectory); + } else { + logger.warn("The Lucene Index located at {} has expired and contains no Provenance Events that still exist in the respository. " + + "However, the directory could not be deleted because it is still actively being used. Will continue to try to delete " + + "in a subsequent maintenance cycle.", indexDirectory); + } + + return removed; + } + + private class RemoveExpiredQueryResults implements Runnable { + @Override + public void run() { + try { + final Date now = new Date(); + + final Iterator<Map.Entry<String, AsyncQuerySubmission>> queryIterator = querySubmissionMap.entrySet().iterator(); + while (queryIterator.hasNext()) { + final Map.Entry<String, AsyncQuerySubmission> entry = queryIterator.next(); + + final StandardQueryResult result = entry.getValue().getResult(); + if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { + queryIterator.remove(); + } + } + + final Iterator<Map.Entry<String, AsyncLineageSubmission>> lineageIterator = lineageSubmissionMap.entrySet().iterator(); + while (lineageIterator.hasNext()) { + final Map.Entry<String, AsyncLineageSubmission> entry = lineageIterator.next(); + + final StandardLineageResult result = entry.getValue().getResult(); + if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { + lineageIterator.remove(); + } + } + } catch (final Exception e) { + logger.error("Failed to expire Provenance Query Results due to {}", e.toString()); + logger.error("", e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java new file mode 100644 index 0000000..38d3f61 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/QueryTask.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.nifi.provenance.ProgressiveResult; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.authorization.EventTransformer; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.index.SearchFailedException; +import org.apache.nifi.provenance.lucene.IndexManager; +import org.apache.nifi.provenance.store.EventStore; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueryTask implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(QueryTask.class); + private static final Set<String> LUCENE_FIELDS_TO_LOAD = Collections.singleton(SearchableFields.Identifier.getSearchableFieldName()); + + private final Query query; + private final ProgressiveResult queryResult; + private final int maxResults; + private final IndexManager indexManager; + private final File indexDir; + private final EventStore eventStore; + private final EventAuthorizer authorizer; + private final EventTransformer transformer; + + public QueryTask(final Query query, final ProgressiveResult result, final int maxResults, final IndexManager indexManager, + final File indexDir, final EventStore eventStore, final EventAuthorizer authorizer, + final EventTransformer unauthorizedTransformer) { + this.query = query; + this.queryResult = result; + this.maxResults = maxResults; + this.indexManager = indexManager; + this.indexDir = indexDir; + this.eventStore = eventStore; + this.authorizer = authorizer; + this.transformer = unauthorizedTransformer; + } + + @Override + public void run() { + if (queryResult.getTotalHitCount() >= maxResults) { + logger.debug("Will not query lucene index {} because maximum results have already been obtained", indexDir); + queryResult.update(Collections.emptyList(), 0L); + return; + } + + if (queryResult.isFinished()) { + logger.debug("Will not query lucene index {} because the query is already finished", indexDir); + return; + } + + + final long borrowStart = System.nanoTime(); + final EventIndexSearcher searcher; + try { + searcher = indexManager.borrowIndexSearcher(indexDir); + } catch (final FileNotFoundException fnfe) { + // We do not consider this an error because it may well just be the case that the event index has aged off and + // been deleted or that we've just created the index and haven't yet committed the writer. So instead, we just + // update the result ot indicate that this index search is complete with no results. + queryResult.update(Collections.emptyList(), 0); + + // nothing has been indexed yet, or the data has already aged off + logger.info("Attempted to search Provenance Index {} but could not find the directory or the directory did not contain a valid Lucene index. " + + "This usually indicates that either the index was just created and hasn't fully been initialized, or that the index was recently aged off.", indexDir); + return; + } catch (final IOException ioe) { + queryResult.setError("Failed to query index " + indexDir + "; see logs for more details"); + logger.error("Failed to query index " + indexDir, ioe); + return; + } + + try { + final long borrowMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - borrowStart); + logger.debug("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis); + final long startNanos = System.nanoTime(); + + // If max number of results are retrieved, do not bother querying lucene + if (queryResult.getTotalHitCount() >= maxResults) { + logger.debug("Will not query lucene index {} because maximum results have already been obtained", indexDir); + queryResult.update(Collections.emptyList(), 0L); + return; + } + + if (queryResult.isFinished()) { + logger.debug("Will not query lucene index {} because the query is already finished", indexDir); + return; + } + + // Query lucene + final IndexReader indexReader = searcher.getIndexSearcher().getIndexReader(); + final TopDocs topDocs; + try { + topDocs = searcher.getIndexSearcher().search(query, maxResults); + } catch (final Exception e) { + logger.error("Failed to query Lucene for index " + indexDir, e); + queryResult.setError("Failed to query Lucene for index " + indexDir + " due to " + e); + return; + } finally { + final long ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + logger.debug("Querying Lucene for index {} took {} ms", indexDir, ms); + } + + // If max number of results are retrieved, do not bother reading docs + if (queryResult.getTotalHitCount() >= maxResults) { + logger.debug("Will not read events from store for {} because maximum results have already been obtained", indexDir); + queryResult.update(Collections.emptyList(), 0L); + return; + } + + if (queryResult.isFinished()) { + logger.debug("Will not read events from store for {} because the query has already finished", indexDir); + return; + } + + final Tuple<List<ProvenanceEventRecord>, Integer> eventsAndTotalHits = readDocuments(topDocs, indexReader); + + if (eventsAndTotalHits == null) { + queryResult.update(Collections.emptyList(), 0L); + logger.info("Will not update query results for queried index {} for query {} because the maximum number of results have been reached already", + indexDir, query); + } else { + queryResult.update(eventsAndTotalHits.getKey(), eventsAndTotalHits.getValue()); + + final long searchNanos = System.nanoTime() - startNanos; + final long millis = TimeUnit.NANOSECONDS.toMillis(searchNanos); + logger.info("Successfully queried index {} for query {}; retrieved {} events with a total of {} hits in {} millis", + indexDir, query, eventsAndTotalHits.getKey().size(), eventsAndTotalHits.getValue(), millis); + } + } catch (final Exception e) { + logger.error("Failed to query events against index " + indexDir, e); + queryResult.setError("Failed to complete query due to " + e); + } finally { + indexManager.returnIndexSearcher(searcher); + } + } + + private Tuple<List<ProvenanceEventRecord>, Integer> readDocuments(final TopDocs topDocs, final IndexReader indexReader) { + // If no topDocs is supplied, just provide a Tuple that has no records and a hit count of 0. + if (topDocs == null || topDocs.totalHits == 0) { + return new Tuple<>(Collections.<ProvenanceEventRecord> emptyList(), 0); + } + + final long start = System.nanoTime(); + final List<Long> eventIds = Arrays.stream(topDocs.scoreDocs) + .mapToInt(scoreDoc -> scoreDoc.doc) + .mapToObj(docId -> { + try { + return indexReader.document(docId, LUCENE_FIELDS_TO_LOAD); + } catch (final Exception e) { + throw new SearchFailedException("Failed to read Provenance Events from Event File", e); + } + }) + .map(doc -> doc.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue()) + .collect(Collectors.toList()); + + final long endConvert = System.nanoTime(); + final long ms = TimeUnit.NANOSECONDS.toMillis(endConvert - start); + logger.debug("Converting documents took {} ms", ms); + + List<ProvenanceEventRecord> events; + try { + events = eventStore.getEvents(eventIds, authorizer, transformer); + } catch (IOException e) { + throw new SearchFailedException("Unable to retrieve events from the Provenance Store", e); + } + + final long fetchEventNanos = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - endConvert); + logger.debug("Fetching {} events from Event Store took {} ms ({} events actually fetched)", eventIds.size(), fetchEventNanos, events.size()); + + final int totalHits = topDocs.totalHits; + return new Tuple<>(events, totalHits); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/StoredDocument.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/StoredDocument.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/StoredDocument.java new file mode 100644 index 0000000..207ba9f --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/StoredDocument.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import org.apache.lucene.document.Document; +import org.apache.nifi.provenance.serialization.StorageSummary; + +public class StoredDocument { + private final Document document; + private final StorageSummary storageSummary; + + public StoredDocument(final Document document, final StorageSummary summary) { + this.document = document; + this.storageSummary = summary; + } + + public Document getDocument() { + return document; + } + + public StorageSummary getStorageSummary() { + return storageSummary; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java index ddfa0db..eefcecb 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java @@ -36,6 +36,8 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.index.EventIndexWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,53 +49,61 @@ public class CachingIndexManager implements Closeable, IndexManager { private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>(); - public void removeIndex(final File indexDirectory) { + @Override + public boolean removeIndex(final File indexDirectory) { final File absoluteFile = indexDirectory.getAbsoluteFile(); logger.info("Removing index {}", indexDirectory); lock.lock(); try { final IndexWriterCount count = writerCounts.remove(absoluteFile); - if ( count != null ) { + if (count != null) { try { count.close(); } catch (final IOException ioe) { logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", ioe); } + + return false; } } final List<ActiveIndexSearcher> searcherList = activeSearchers.remove(absoluteFile); if (searcherList != null) { - for ( final ActiveIndexSearcher searcher : searcherList ) { + for (final ActiveIndexSearcher searcher : searcherList) { try { searcher.close(); } catch (final IOException ioe) { logger.warn("Failed to close Index Searcher {} for {} due to {}", - searcher.getSearcher(), absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { + searcher.getSearcher(), absoluteFile, ioe); + if (logger.isDebugEnabled()) { logger.warn("", ioe); } + + return false; } } } } finally { lock.unlock(); } + + return true; } - public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { - final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.trace("Borrowing index writer for {}", indexingDirectory); + @Override + public EventIndexWriter borrowIndexWriter(final File indexDirectory) throws IOException { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.trace("Borrowing index writer for {}", indexDirectory); lock.lock(); try { IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount == null ) { + if (writerCount == null) { final List<Closeable> closeables = new ArrayList<>(); - final Directory directory = FSDirectory.open(indexingDirectory); + final Directory directory = FSDirectory.open(indexDirectory); closeables.add(directory); try { @@ -104,10 +114,11 @@ public class CachingIndexManager implements Closeable, IndexManager { config.setWriteLockTimeout(300000L); final IndexWriter indexWriter = new IndexWriter(directory, config); - writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1); - logger.debug("Providing new index writer for {}", indexingDirectory); + final EventIndexWriter eventIndexWriter = new LuceneEventIndexWriter(indexWriter, indexDirectory); + writerCount = new IndexWriterCount(eventIndexWriter, analyzer, directory, 1); + logger.debug("Providing new index writer for {}", indexDirectory); } catch (final IOException ioe) { - for ( final Closeable closeable : closeables ) { + for (final Closeable closeable : closeables) { try { closeable.close(); } catch (final IOException ioe2) { @@ -122,16 +133,16 @@ public class CachingIndexManager implements Closeable, IndexManager { // Mark any active searchers as poisoned because we are updating the index final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile); - if ( searchers != null ) { + if (searchers != null) { for (final ActiveIndexSearcher activeSearcher : searchers) { - logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory); + logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexDirectory); activeSearcher.poison(); } } } else { - logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); + logger.debug("Providing existing index writer for {} and incrementing count to {}", indexDirectory, writerCount.getCount() + 1); writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); } return writerCount.getWriter(); @@ -140,31 +151,53 @@ public class CachingIndexManager implements Closeable, IndexManager { } } - public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { - final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory); + + @Override + public void returnIndexWriter(final EventIndexWriter writer) { + returnIndexWriter(writer, true, true); + } + + @Override + public void returnIndexWriter(final EventIndexWriter writer, final boolean commit, final boolean isCloseable) { + final File indexDirectory = writer.getDirectory(); + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.trace("Returning Index Writer for {} to IndexManager", indexDirectory); lock.lock(); try { - final IndexWriterCount count = writerCounts.remove(absoluteFile); + final IndexWriterCount count = writerCounts.get(absoluteFile); try { - if ( count == null ) { + if (count == null) { logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " - + "This could potentially lead to a resource leak", writer, indexingDirectory); + + "This could potentially lead to a resource leak", writer, indexDirectory); writer.close(); - } else if ( count.getCount() <= 1 ) { + } else if (count.getCount() <= 1) { // we are finished with this writer. - logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1); - count.close(); + logger.info("Decrementing count for Index Writer for {} to {}. Now finished writing to this Index Directory", + indexDirectory, count.getCount() - 1); + + try { + if (commit) { + writer.commit(); + } + } finally { + if (isCloseable) { + try { + count.close(); + } finally { + writerCounts.remove(absoluteFile); + } + } + } } else { // decrement the count. - logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1); + logger.debug("Decrementing count for Index Writer for {} to {}", indexDirectory, count.getCount() - 1); writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1)); } } catch (final IOException ioe) { logger.warn("Failed to close Index Writer {} due to {}", writer, ioe); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", ioe); } } @@ -174,7 +207,8 @@ public class CachingIndexManager implements Closeable, IndexManager { } - public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { + @Override + public EventIndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { final File absoluteFile = indexDir.getAbsoluteFile(); logger.trace("Borrowing index searcher for {}", indexDir); @@ -182,7 +216,7 @@ public class CachingIndexManager implements Closeable, IndexManager { try { // check if we already have a reader cached. List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); - if ( currentlyCached == null ) { + if (currentlyCached == null) { currentlyCached = new ArrayList<>(); activeSearchers.put(absoluteFile, currentlyCached); } else { @@ -197,7 +231,7 @@ public class CachingIndexManager implements Closeable, IndexManager { // if there are no references to the reader, it will have been closed. Since there is no // isClosed() method, this is how we determine whether it's been closed or not. - final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); + final int refCount = searcher.getSearcher().getIndexSearcher().getIndexReader().getRefCount(); if (refCount <= 0) { // if refCount == 0, then the reader has been closed, so we cannot use the searcher logger.debug("Reference count for cached Index Searcher for {} is currently {}; " @@ -216,16 +250,17 @@ public class CachingIndexManager implements Closeable, IndexManager { // if we have an Index Writer, and if so create a Reader based on the Index Writer. // This will provide us a 'near real time' index reader. final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount == null ) { + if (writerCount == null) { final Directory directory = FSDirectory.open(absoluteFile); logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir); try { final DirectoryReader directoryReader = DirectoryReader.open(directory); final IndexSearcher searcher = new IndexSearcher(directoryReader); + final EventIndexSearcher eventIndexSearcher = new LuceneEventIndexSearcher(searcher, indexDir, directory, directoryReader); // we want to cache the searcher that we create, since it's just a reader. - final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true); + final ActiveIndexSearcher cached = new ActiveIndexSearcher(eventIndexSearcher, absoluteFile, directoryReader, directory, true); currentlyCached.add(cached); return cached.getSearcher(); @@ -243,22 +278,23 @@ public class CachingIndexManager implements Closeable, IndexManager { } } else { logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing " - + "counter to {}", indexDir, writerCount.getCount() + 1); + + "counter to {}", indexDir, writerCount.getCount() + 1); // increment the writer count to ensure that it's kept open. writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); // create a new Index Searcher from the writer so that we don't have an issue with trying // to read from a directory that's locked. If we get the "no segments* file found" with // Lucene, this indicates that an IndexWriter already has the directory open. - final IndexWriter writer = writerCount.getWriter(); - final DirectoryReader directoryReader = DirectoryReader.open(writer, false); + final EventIndexWriter writer = writerCount.getWriter(); + final DirectoryReader directoryReader = DirectoryReader.open(writer.getIndexWriter(), false); final IndexSearcher searcher = new IndexSearcher(directoryReader); + final EventIndexSearcher eventIndexSearcher = new LuceneEventIndexSearcher(searcher, indexDir, null, directoryReader); // we don't want to cache this searcher because it's based on a writer, so we want to get // new values the next time that we search. - final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false); + final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(eventIndexSearcher, absoluteFile, directoryReader, null, false); currentlyCached.add(activeSearcher); return activeSearcher.getSearcher(); @@ -269,7 +305,9 @@ public class CachingIndexManager implements Closeable, IndexManager { } - public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { + @Override + public void returnIndexSearcher(final EventIndexSearcher searcher) { + final File indexDirectory = searcher.getIndexDirectory(); final File absoluteFile = indexDirectory.getAbsoluteFile(); logger.trace("Returning index searcher for {} to IndexManager", indexDirectory); @@ -277,9 +315,9 @@ public class CachingIndexManager implements Closeable, IndexManager { try { // check if we already have a reader cached. final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); - if ( currentlyCached == null ) { + if (currentlyCached == null) { logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could " - + "result in a resource leak", indexDirectory); + + "result in a resource leak", indexDirectory); return; } @@ -289,20 +327,20 @@ public class CachingIndexManager implements Closeable, IndexManager { boolean activeSearcherFound = false; while (itr.hasNext()) { final ActiveIndexSearcher activeSearcher = itr.next(); - if ( activeSearcher.getSearcher().equals(searcher) ) { + if (activeSearcher.getSearcher().equals(searcher)) { activeSearcherFound = true; - if ( activeSearcher.isCache() ) { + if (activeSearcher.isCache()) { // if the searcher is poisoned, close it and remove from "pool". Otherwise, // just decrement the count. Note here that when we call close() it won't actually close // the underlying directory reader unless there are no more references to it - if ( activeSearcher.isPoisoned() ) { + if (activeSearcher.isPoisoned()) { itr.remove(); try { activeSearcher.close(); } catch (final IOException ioe) { logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", ioe); } } @@ -322,26 +360,26 @@ public class CachingIndexManager implements Closeable, IndexManager { // decrement the writer count because we incremented it when creating the searcher final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount != null ) { - if ( writerCount.getCount() <= 1 ) { + if (writerCount != null) { + if (writerCount.getCount() <= 1) { try { logger.debug("Index searcher for {} is not cached. Writer count is " - + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1); + + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1); writerCount.close(); } catch (final IOException ioe) { logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", ioe); } } } else { logger.debug("Index searcher for {} is not cached. Writer count is decremented " - + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1); + + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1); writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), - writerCount.getCount() - 1)); + writerCount.getAnalyzer(), writerCount.getDirectory(), + writerCount.getCount() - 1)); } } @@ -353,7 +391,7 @@ public class CachingIndexManager implements Closeable, IndexManager { } } catch (final IOException ioe) { logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.warn("", ioe); } } @@ -378,11 +416,11 @@ public class CachingIndexManager implements Closeable, IndexManager { try { IOException ioe = null; - for ( final IndexWriterCount count : writerCounts.values() ) { + for (final IndexWriterCount count : writerCounts.values()) { try { count.close(); } catch (final IOException e) { - if ( ioe == null ) { + if (ioe == null) { ioe = e; } else { ioe.addSuppressed(e); @@ -395,7 +433,7 @@ public class CachingIndexManager implements Closeable, IndexManager { try { searcher.close(); } catch (final IOException e) { - if ( ioe == null ) { + if (ioe == null) { ioe = e; } else { ioe.addSuppressed(e); @@ -404,7 +442,7 @@ public class CachingIndexManager implements Closeable, IndexManager { } } - if ( ioe != null ) { + if (ioe != null) { throw ioe; } } finally { @@ -415,15 +453,15 @@ public class CachingIndexManager implements Closeable, IndexManager { private static void close(final Closeable... closeables) throws IOException { IOException ioe = null; - for ( final Closeable closeable : closeables ) { - if ( closeable == null ) { + for (final Closeable closeable : closeables) { + if (closeable == null) { continue; } try { closeable.close(); } catch (final IOException e) { - if ( ioe == null ) { + if (ioe == null) { ioe = e; } else { ioe.addSuppressed(e); @@ -431,14 +469,14 @@ public class CachingIndexManager implements Closeable, IndexManager { } } - if ( ioe != null ) { + if (ioe != null) { throw ioe; } } private static class ActiveIndexSearcher { - private final IndexSearcher searcher; + private final EventIndexSearcher searcher; private final DirectoryReader directoryReader; private final File indexDirectory; private final Directory directory; @@ -446,8 +484,8 @@ public class CachingIndexManager implements Closeable, IndexManager { private final AtomicInteger referenceCount = new AtomicInteger(1); private volatile boolean poisoned = false; - public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader, - final Directory directory, final boolean cache) { + public ActiveIndexSearcher(final EventIndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader, + final Directory directory, final boolean cache) { this.searcher = searcher; this.directoryReader = directoryReader; this.indexDirectory = indexDirectory; @@ -459,7 +497,7 @@ public class CachingIndexManager implements Closeable, IndexManager { return cache; } - public IndexSearcher getSearcher() { + public EventIndexSearcher getSearcher() { return searcher; } @@ -499,12 +537,12 @@ public class CachingIndexManager implements Closeable, IndexManager { private static class IndexWriterCount implements Closeable { - private final IndexWriter writer; + private final EventIndexWriter writer; private final Analyzer analyzer; private final Directory directory; private final int count; - public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { + public IndexWriterCount(final EventIndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { this.writer = writer; this.analyzer = analyzer; this.directory = directory; @@ -519,7 +557,7 @@ public class CachingIndexManager implements Closeable, IndexManager { return directory; } - public IndexWriter getWriter() { + public EventIndexWriter getWriter() { return writer; } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java index 7707352..f372a2d 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.Term; import org.apache.nifi.provenance.IndexConfiguration; import org.apache.nifi.provenance.PersistentProvenanceRepository; import org.apache.nifi.provenance.expiration.ExpirationAction; +import org.apache.nifi.provenance.index.EventIndexWriter; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.slf4j.Logger; @@ -60,15 +61,16 @@ public class DeleteIndexAction implements ExpirationAction { final Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), ".")); boolean deleteDir = false; - final IndexWriter writer = indexManager.borrowIndexWriter(indexingDirectory); + final EventIndexWriter writer = indexManager.borrowIndexWriter(indexingDirectory); try { - writer.deleteDocuments(term); - writer.commit(); - final int docsLeft = writer.numDocs(); + final IndexWriter indexWriter = writer.getIndexWriter(); + indexWriter.deleteDocuments(term); + indexWriter.commit(); + final int docsLeft = indexWriter.numDocs(); deleteDir = docsLeft <= 0; logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); } finally { - indexManager.returnIndexWriter(indexingDirectory, writer); + indexManager.returnIndexWriter(writer); } // we've confirmed that all documents have been removed. Delete the index directory. http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index ce62152..0e96b62 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -30,25 +30,25 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.StandardProvenanceEventRecord; -import org.apache.nifi.provenance.authorization.AuthorizationCheck; +import org.apache.nifi.provenance.authorization.EventAuthorizer; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.provenance.toc.TocReader; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class DocsReader { +public class DocsReader { private final Logger logger = LoggerFactory.getLogger(DocsReader.class); - public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final AuthorizationCheck authCheck, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, + public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final EventAuthorizer authorizer, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { if (retrievalCount.get() >= maxResults) { return Collections.emptySet(); @@ -67,7 +67,7 @@ class DocsReader { final long readDocuments = System.nanoTime() - start; logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments)); - return read(docs, authCheck, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars); + return read(docs, authorizer, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars); } @@ -106,7 +106,7 @@ class DocsReader { return record; } - public Set<ProvenanceEventRecord> read(final List<Document> docs, final AuthorizationCheck authCheck, final Collection<Path> allProvenanceLogFiles, + public Set<ProvenanceEventRecord> read(final List<Document> docs, final EventAuthorizer authorizer, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { if (retrievalCount.get() >= maxResults) { @@ -114,38 +114,33 @@ class DocsReader { } final long start = System.nanoTime(); - - Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>(); - - Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs); + final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>(); + final Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs); int eventsReadThisFile = 0; int logFileCount = 0; for (String storageFileName : byStorageNameDocGroups.keySet()) { - File provenanceEventFile = LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles); - if (provenanceEventFile != null) { - try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles, - maxAttributeChars)) { - - Iterator<Document> docIter = byStorageNameDocGroups.get(storageFileName).iterator(); - while (docIter.hasNext() && retrievalCount.getAndIncrement() < maxResults) { - ProvenanceEventRecord event = this.getRecord(docIter.next(), reader); - if (event != null && authCheck.isAuthorized(event)) { - matchingRecords.add(event); - eventsReadThisFile++; - } - } + final File provenanceEventFile = LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles); + if (provenanceEventFile == null) { + logger.warn("Could not find Provenance Log File with " + + "basename {} in the Provenance Repository; assuming " + + "file has expired and continuing without it", storageFileName); + continue; + } - } catch (Exception e) { - logger.warn("Failed while trying to read Provenance Events. The event file '" - + provenanceEventFile.getAbsolutePath() + - "' may be missing or corrupted.", e); + try (final RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles, maxAttributeChars)) { + final Iterator<Document> docIter = byStorageNameDocGroups.get(storageFileName).iterator(); + while (docIter.hasNext() && retrievalCount.getAndIncrement() < maxResults) { + final ProvenanceEventRecord event = getRecord(docIter.next(), reader); + if (event != null && authorizer.isAuthorized(event)) { + matchingRecords.add(event); + eventsReadThisFile++; + } } - } else { - logger.warn("Could not find Provenance Log File with " - + "basename {} in the Provenance Repository; assuming " - + "file has expired and continuing without it", storageFileName); + } catch (final Exception e) { + logger.warn("Failed to read Provenance Events. The event file '" + + provenanceEventFile.getAbsolutePath() + "' may be missing or corrupt.", e); } }