Repository: incubator-nifi
Updated Branches:
  refs/heads/develop c12778f17 -> b760505bf


NIFI-596: If IndexWriter is opened for same directory as an IndexReader, mark 
IndexReader as poisoned and stop using it

NIFI-595: Delete .toc files when expiring an event file

NIFI-597: Only increment counter for number of documents retrieved after 
reading the record


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b760505b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b760505b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b760505b

Branch: refs/heads/develop
Commit: b760505bf388848d5329a658bb429d4ef34afcf7
Parents: c12778f
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed May 6 15:43:20 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Fri May 8 12:02:39 2015 -0400

----------------------------------------------------------------------
 .../expiration/FileRemovalAction.java           | 28 +++++++--
 .../nifi/provenance/lucene/DocsReader.java      | 25 +++++---
 .../nifi/provenance/lucene/IndexManager.java    | 61 +++++++++++++++++---
 3 files changed, 92 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b760505b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java
index 1b4bafe..2333079 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java
@@ -20,7 +20,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.lucene.DeleteIndexAction;
-
+import org.apache.nifi.provenance.toc.TocUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,16 +30,32 @@ public class FileRemovalAction implements ExpirationAction {
 
     @Override
     public File execute(final File expiredFile) throws IOException {
+        final boolean removed = remove(expiredFile);
+        if (removed) {
+            logger.info("Removed expired Provenance Event file {}", 
expiredFile);
+        } else {
+            logger.warn("Failed to remove old Provenance Event file {}; this 
file should be cleaned up manually", expiredFile);
+        }
+
+        final File tocFile = TocUtil.getTocFile(expiredFile);
+        if (remove(tocFile)) {
+            logger.info("Removed expired Provenance Table-of-Contents file 
{}", tocFile);
+        } else {
+            logger.warn("Failed to remove old Provenance Table-of-Contents 
file {}; this file should be cleaned up manually", expiredFile);
+        }
+
+        return removed ? null : expiredFile;
+    }
+
+    private boolean remove(final File file) {
         boolean removed = false;
         for (int i = 0; i < 10 && !removed; i++) {
-            if ((removed = expiredFile.delete())) {
-                logger.info("Removed expired Provenance Event file {}", 
expiredFile);
-                return null;
+            if (removed = file.delete()) {
+                return true;
             }
         }
 
-        logger.warn("Failed to remove old Provenance Event file {}", 
expiredFile);
-        return expiredFile;
+        return false;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b760505b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 98137fb..02fd5c3 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -64,14 +64,11 @@ public class DocsReader {
             final int docId = scoreDoc.doc;
             final Document d = indexReader.document(docId);
             docs.add(d);
-            if ( retrievalCount.incrementAndGet() >= maxResults ) {
-                break;
-            }
         }
 
         final long readDocuments = System.nanoTime() - start;
         logger.debug("Reading {} Lucene Documents took {} millis", 
docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
-        return read(docs, allProvenanceLogFiles);
+        return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
     }
 
 
@@ -88,7 +85,7 @@ public class DocsReader {
 
 
     private ProvenanceEventRecord getRecord(final Document d, final 
RecordReader reader) throws IOException {
-        IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+        final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
         if ( blockField == null ) {
             reader.skipTo(getByteOffset(d, reader));
         } else {
@@ -97,7 +94,7 @@ public class DocsReader {
 
         StandardProvenanceEventRecord record;
         while ( (record = reader.nextRecord()) != null) {
-            IndexableField idField = 
d.getField(SearchableFields.Identifier.getSearchableFieldName());
+            final IndexableField idField = 
d.getField(SearchableFields.Identifier.getSearchableFieldName());
             if ( idField == null || idField.numericValue().longValue() == 
record.getEventId() ) {
                 break;
             }
@@ -111,7 +108,11 @@ public class DocsReader {
     }
 
 
-    public Set<ProvenanceEventRecord> read(final List<Document> docs, final 
Collection<Path> allProvenanceLogFiles) throws IOException {
+    public Set<ProvenanceEventRecord> read(final List<Document> docs, final 
Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, 
final int maxResults) throws IOException {
+        if (retrievalCount.get() >= maxResults) {
+            return Collections.emptySet();
+        }
+
         LuceneUtil.sortDocsForRetrieval(docs);
 
         RecordReader reader = null;
@@ -133,6 +134,10 @@ public class DocsReader {
                 try {
                     if (reader != null && 
storageFilename.equals(lastStorageFilename)) {
                         matchingRecords.add(getRecord(d, reader));
+
+                        if ( retrievalCount.incrementAndGet() >= maxResults ) {
+                            break;
+                        }
                     } else {
                         logger.debug("Opening log file {}", storageFilename);
 
@@ -141,7 +146,7 @@ public class DocsReader {
                             reader.close();
                         }
 
-                        List<File> potentialFiles = 
LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
+                        final List<File> potentialFiles = 
LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
                         if (potentialFiles.isEmpty()) {
                             logger.warn("Could not find Provenance Log File 
with basename {} in the "
                                     + "Provenance Repository; assuming file 
has expired and continuing without it", storageFilename);
@@ -158,6 +163,10 @@ public class DocsReader {
                             try {
                                 reader = RecordReaders.newRecordReader(file, 
allProvenanceLogFiles);
                                 matchingRecords.add(getRecord(d, reader));
+
+                                if ( retrievalCount.incrementAndGet() >= 
maxResults ) {
+                                    break;
+                                }
                             } catch (final IOException e) {
                                 throw new IOException("Failed to retrieve 
record " + d + " from Provenance File " + file + " due to " + e, e);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b760505b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 9c3ec31..31f31a5 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -119,6 +119,14 @@ public class IndexManager implements Closeable {
                 }
 
                 writerCounts.put(absoluteFile, writerCount);
+
+                // Mark any active searchers as poisoned because we are 
updating the index
+                final List<ActiveIndexSearcher> searchers = 
activeSearchers.get(absoluteFile);
+                if ( searchers != null ) {
+                    for (final ActiveIndexSearcher activeSearcher : searchers) 
{
+                        activeSearcher.poison();
+                    }
+                }
             } else {
                 logger.debug("Providing existing index writer for {} and 
incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
                 writerCounts.put(absoluteFile, new 
IndexWriterCount(writerCount.getWriter(),
@@ -137,7 +145,7 @@ public class IndexManager implements Closeable {
 
         lock.lock();
         try {
-            IndexWriterCount count = writerCounts.remove(absoluteFile);
+            final IndexWriterCount count = writerCounts.remove(absoluteFile);
 
             try {
                 if ( count == null ) {
@@ -184,6 +192,15 @@ public class IndexManager implements Closeable {
                 try {
                     for ( final ActiveIndexSearcher searcher : currentlyCached 
) {
                         if ( searcher.isCache() ) {
+                            // if the searcher is poisoned, we want to close 
and expire it.
+                            if ( searcher.isPoisoned() ) {
+                                logger.debug("Index Searcher for {} is 
poisoned; removing cached searcher", absoluteFile);
+                                expired.add(searcher);
+                                continue;
+                            }
+
+                            // if there are no references to the reader, it 
will have been closed. Since there is no
+                            // isClosed() method, this is how we determine 
whether it's been closed or not.
                             final int refCount = 
searcher.getSearcher().getIndexReader().getRefCount();
                             if ( refCount <= 0 ) {
                                 // if refCount == 0, then the reader has been 
closed, so we need to discard the searcher
@@ -212,7 +229,7 @@ public class IndexManager implements Closeable {
                 }
             }
 
-            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            final IndexWriterCount writerCount = 
writerCounts.remove(absoluteFile);
             if ( writerCount == null ) {
                 final Directory directory = FSDirectory.open(absoluteFile);
                 logger.debug("No Index Writer currently exists for {}; 
creating a cachable reader", indexDir);
@@ -270,21 +287,40 @@ public class IndexManager implements Closeable {
         lock.lock();
         try {
             // check if we already have a reader cached.
-            List<ActiveIndexSearcher> currentlyCached = 
activeSearchers.get(absoluteFile);
+            final List<ActiveIndexSearcher> currentlyCached = 
activeSearchers.get(absoluteFile);
             if ( currentlyCached == null ) {
                 logger.warn("Received Index Searcher for {} but no searcher 
was provided for that directory; this could "
                         + "result in a resource leak", indexDirectory);
                 return;
             }
 
+            // Check if the given searcher is in our list. We use an Iterator 
to do this so that if we
+            // find it we can call remove() on the iterator if need be.
             final Iterator<ActiveIndexSearcher> itr = 
currentlyCached.iterator();
             while (itr.hasNext()) {
                 final ActiveIndexSearcher activeSearcher = itr.next();
                 if ( activeSearcher.getSearcher().equals(searcher) ) {
                     if ( activeSearcher.isCache() ) {
-                        // the searcher is cached. Just leave it open.
-                        logger.debug("Index searcher for {} is cached; leaving 
open", indexDirectory);
-                        return;
+                        // if the searcher is poisoned, close it and remove 
from "pool".
+                        if ( activeSearcher.isPoisoned() ) {
+                            itr.remove();
+
+                            try {
+                                logger.debug("Closing Index Searcher for {} 
because it is poisoned", indexDirectory);
+                                activeSearcher.close();
+                            } catch (final IOException ioe) {
+                                logger.warn("Failed to close Index Searcher 
for {} due to {}", absoluteFile, ioe);
+                                if ( logger.isDebugEnabled() ) {
+                                    logger.warn("", ioe);
+                                }
+                            }
+
+                            return;
+                        } else {
+                            // the searcher is cached. Just leave it open.
+                            logger.debug("Index searcher for {} is cached; 
leaving open", indexDirectory);
+                            return;
+                        }
                     } else {
                         // searcher is not cached. It was created from a 
writer, and we want
                         // the newest updates the next time that we get a 
searcher, so we will
@@ -405,9 +441,10 @@ public class IndexManager implements Closeable {
         private final DirectoryReader directoryReader;
         private final Directory directory;
         private final boolean cache;
+        private boolean poisoned = false;
 
-        public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader 
directoryReader,
-                Directory directory, final boolean cache) {
+        public ActiveIndexSearcher(final IndexSearcher searcher, final 
DirectoryReader directoryReader,
+                final Directory directory, final boolean cache) {
             this.searcher = searcher;
             this.directoryReader = directoryReader;
             this.directory = directory;
@@ -422,6 +459,14 @@ public class IndexManager implements Closeable {
             return searcher;
         }
 
+        public boolean isPoisoned() {
+            return poisoned;
+        }
+
+        public void poison() {
+            this.poisoned = true;
+        }
+
         @Override
         public void close() throws IOException {
             IndexManager.close(directoryReader, directory);

Reply via email to