Repository: nifi
Updated Branches:
  refs/heads/master a2d3d0c28 -> 95b5877f5


NIFI-2600: Ensure that we do not close Index Searchers prematurely, even if 
they are poisoned

This closes #896


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/95b5877f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/95b5877f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/95b5877f

Branch: refs/heads/master
Commit: 95b5877f5de679da689b76917d93b3ad7fc9b890
Parents: a2d3d0c
Author: Mark Payne <[email protected]>
Authored: Thu Aug 18 15:29:34 2016 -0400
Committer: Matt Burgess <[email protected]>
Committed: Mon Aug 22 12:03:47 2016 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         |  23 ++--
 .../nifi/provenance/lucene/IndexManager.java    |  68 ++++-------
 .../TestPersistentProvenanceRepository.java     |  84 +++++++-------
 .../provenance/lucene/TestIndexManager.java     | 114 +++++++++++++++++++
 4 files changed, 194 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/95b5877f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 2d0bbcb..da3a6f5 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -1241,6 +1241,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         }
     }
 
+    protected long getRolloverRetryMillis() {
+        return 10000L;
+    }
+
     /**
      * <p>
      * MUST be called with the write lock held.
@@ -1349,14 +1353,14 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
                             future.cancel(false);
 
                         } else {
-                            logger.warn("Couldn't merge journals. Will try 
again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, 
storageDir);
+                            logger.warn("Couldn't merge journals. Will try 
again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
                         }
                     }
                 };
 
                 // We are going to schedule the future to run immediately and 
then repeat every 10 seconds. This allows us to keep retrying if we
                 // fail for some reason. When we succeed or if retries are 
exceeded, the Runnable will cancel itself.
-                future = 
rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, 
TimeUnit.SECONDS);
+                future = 
rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 
getRolloverRetryMillis(), TimeUnit.MILLISECONDS);
                 futureReference.set(future);
             }
 
@@ -1499,6 +1503,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         return mergedFile;
     }
 
+    protected List<File> filterUnavailableFiles(final List<File> journalFiles) 
{
+        return journalFiles.stream().filter(file -> 
file.exists()).collect(Collectors.toList());
+    }
+
     /**
      * <p>
      * Merges all of the given Journal Files into a single, merged Provenance
@@ -1555,9 +1563,10 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
             }
         });
 
-        //Search for any missing files. At this point they should have been 
written to disk otherwise cannot continue
-        //missing files is most likely due to incomplete cleanup of files post 
merge
-        final long numAvailableFiles = journalFiles.size() - 
journalFiles.stream().filter(file -> !file.exists()).count();
+        // Search for any missing files. At this point they should have been 
written to disk otherwise cannot continue.
+        // Missing files is most likely due to incomplete cleanup of files 
post merge
+        final List<File> availableFiles = filterUnavailableFiles(journalFiles);
+        final int numAvailableFiles = availableFiles.size();
 
         // check if we have all of the "partial" files for the journal.
         if (numAvailableFiles > 0) {
@@ -1606,7 +1615,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         final File writerFile = isCompress ? new 
File(suggestedMergeFile.getParentFile(), suggestedMergeFile.getName() + ".gz") 
: suggestedMergeFile;
 
         try {
-            for (final File journalFile : journalFiles) {
+            for (final File journalFile : availableFiles) {
                 try {
                     // Use MAX_VALUE for number of chars because we don't want 
to truncate the value as we write it
                     // out. This allows us to later decide that we want more 
characters and still be able to retrieve
@@ -1842,7 +1851,7 @@ public class PersistentProvenanceRepository implements 
ProvenanceRepository {
         }
 
         // Success. Remove all of the journal files, as they're no longer 
needed, now that they've been merged.
-        for (final File journalFile : journalFiles) {
+        for (final File journalFile : availableFiles) {
             if (!journalFile.delete() && journalFile.exists()) {
                 logger.warn("Failed to remove temporary journal file {}; this 
file should be cleaned up manually", journalFile.getAbsolutePath());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/95b5877f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 6995173..3d38cac 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -187,47 +187,26 @@ public class IndexManager implements Closeable {
             } else {
                 // keep track of any searchers that have been closed so that 
we can remove them
                 // from our cache later.
-                final List<ActiveIndexSearcher> expired = new ArrayList<>();
-
-                try {
-                    for ( final ActiveIndexSearcher searcher : currentlyCached 
) {
-                        if ( searcher.isCache() ) {
-                            // if the searcher is poisoned, we want to close 
and expire it.
-                            if ( searcher.isPoisoned() ) {
-                                logger.debug("Index Searcher for {} is 
poisoned; removing cached searcher", absoluteFile);
-                                expired.add(searcher);
-                                continue;
-                            }
-
-                            // if there are no references to the reader, it 
will have been closed. Since there is no
-                            // isClosed() method, this is how we determine 
whether it's been closed or not.
-                            final int refCount = 
searcher.getSearcher().getIndexReader().getRefCount();
-                            if ( refCount <= 0 ) {
-                                // if refCount == 0, then the reader has been 
closed, so we need to discard the searcher
-                                logger.debug("Reference count for cached Index 
Searcher for {} is currently {}; "
-                                        + "removing cached searcher", 
absoluteFile, refCount);
-                                expired.add(searcher);
-                                continue;
-                            }
-
-                            final int referenceCount = 
searcher.incrementReferenceCount();
-                            logger.debug("Providing previously cached index 
searcher for {} and incrementing Reference Count to {}", indexDir, 
referenceCount);
-                            return searcher.getSearcher();
+                for (final ActiveIndexSearcher searcher : currentlyCached) {
+                    if (searcher.isCache()) {
+                        // if the searcher is poisoned, we want to close and 
expire it.
+                        if (searcher.isPoisoned()) {
+                            continue;
                         }
-                    }
-                } finally {
-                    // if we have any expired index searchers, we need to 
close them and remove them
-                    // from the cache so that we don't try to use them again 
later.
-                    for ( final ActiveIndexSearcher searcher : expired ) {
-                        try {
-                            logger.debug("Closing {}", searcher);
-                            searcher.close();
-                            logger.trace("Closed {}", searcher);
-                        } catch (final Exception e) {
-                            logger.debug("Failed to close 'expired' 
IndexSearcher {}", searcher);
+
+                        // if there are no references to the reader, it will 
have been closed. Since there is no
+                        // isClosed() method, this is how we determine whether 
it's been closed or not.
+                        final int refCount = 
searcher.getSearcher().getIndexReader().getRefCount();
+                        if (refCount <= 0) {
+                            // if refCount == 0, then the reader has been 
closed, so we cannot use the searcher
+                            logger.debug("Reference count for cached Index 
Searcher for {} is currently {}; "
+                                + "removing cached searcher", absoluteFile, 
refCount);
+                            continue;
                         }
 
-                        currentlyCached.remove(searcher);
+                        final int referenceCount = 
searcher.incrementReferenceCount();
+                        logger.debug("Providing previously cached index 
searcher for {} and incrementing Reference Count to {}", indexDir, 
referenceCount);
+                        return searcher.getSearcher();
                     }
                 }
             }
@@ -312,16 +291,14 @@ public class IndexManager implements Closeable {
                 if ( activeSearcher.getSearcher().equals(searcher) ) {
                     activeSearcherFound = true;
                     if ( activeSearcher.isCache() ) {
-                        // if the searcher is poisoned, close it and remove 
from "pool".
+                        // if the searcher is poisoned, close it and remove 
from "pool". Otherwise,
+                        // just decrement the count. Note here that when we 
call close() it won't actually close
+                        // the underlying directory reader unless there are no 
more references to it
                         if ( activeSearcher.isPoisoned() ) {
                             itr.remove();
 
                             try {
-                                logger.debug("Closing Index Searcher for {} 
because it is poisoned", indexDirectory);
-                                final boolean allReferencesClosed = 
activeSearcher.close();
-                                if (!allReferencesClosed) {
-                                    currentlyCached.add(activeSearcher);
-                                }
+                                activeSearcher.close();
                             } catch (final IOException ioe) {
                                 logger.warn("Failed to close Index Searcher 
for {} due to {}", absoluteFile, ioe);
                                 if ( logger.isDebugEnabled() ) {
@@ -384,7 +361,8 @@ public class IndexManager implements Closeable {
             }
 
             if (!activeSearcherFound) {
-                logger.error("Index Searcher {} was returned for {} but found 
no Active Searcher for it", searcher, indexDirectory);
+                logger.debug("Index Searcher {} was returned for {} but found 
no Active Searcher for it. "
+                    + "This will occur if the Index Searcher was already 
returned while being poisoned.", searcher, indexDirectory);
             }
         } finally {
             lock.unlock();

http://git-wip-us.apache.org/repos/asf/nifi/blob/95b5877f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index ff12691..845d6ea 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -16,6 +16,34 @@
  */
 package org.apache.nifi.provenance;
 
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPOutputStream;
+
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.SimpleAnalyzer;
 import org.apache.lucene.document.Document;
@@ -63,35 +91,6 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPOutputStream;
-
-import static org.apache.nifi.provenance.TestUtil.createFlowFile;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 public class TestPersistentProvenanceRepository {
 
     @Rule
@@ -1681,6 +1680,17 @@ public class TestPersistentProvenanceRepository {
                 retryAmount.incrementAndGet();
                 return super.mergeJournals(journalFiles, suggestedMergeFile, 
eventReporter);
             }
+
+            // Indicate that there are no files available.
+            @Override
+            protected List<File> filterUnavailableFiles(List<File> 
journalFiles) {
+                return Collections.emptyList();
+            }
+
+            @Override
+            protected long getRolloverRetryMillis() {
+                return 10L; // retry quickly.
+            }
         };
         repo.initialize(getEventReporter(), null, null);
 
@@ -1706,23 +1716,11 @@ public class TestPersistentProvenanceRepository {
                 }
             });
         }
-
-        final File storageDir = config.getStorageDirectories().get(0);
-        //trigger retry through full file deletion
-        Arrays.asList(storageDir.listFiles())
-                .stream()
-                .map(file -> new File(storageDir, "journals"))
-                .map(journalDir -> Arrays.asList(journalDir.listFiles()))
-                .flatMap(partials -> partials.stream())
-                .filter(partial -> partial.exists())
-                .forEach(file -> {
-                  file.delete();
-                });
+        exec.shutdown();
+        exec.awaitTermination(10, TimeUnit.SECONDS);
 
         repo.waitForRollover();
-
         assertEquals(5,retryAmount.get());
-
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/95b5877f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
new file mode 100644
index 0000000..afe4512
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestIndexManager {
+
+    private File indexDir;
+    private IndexManager manager;
+
+    @Before
+    public void setup() {
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", 
"DEBUG");
+        manager = new IndexManager();
+
+        indexDir = new File("target/testIndexManager/" + 
UUID.randomUUID().toString());
+        indexDir.mkdirs();
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        manager.close();
+
+        FileUtils.deleteFiles(Collections.singleton(indexDir), true);
+    }
+
+    @Test
+    public void test() throws IOException {
+        // Create and IndexWriter and add a document to the index, then close 
the writer.
+        // This gives us something that we can query.
+        final IndexWriter writer = manager.borrowIndexWriter(indexDir);
+        final Document doc = new Document();
+        doc.add(new StringField("unit test", "true", Store.YES));
+        writer.addDocument(doc);
+        manager.returnIndexWriter(indexDir, writer);
+
+        // Get an Index Searcher that we can use to query the index.
+        final IndexSearcher cachedSearcher = 
manager.borrowIndexSearcher(indexDir);
+
+        // Ensure that we get the expected results.
+        assertCount(cachedSearcher, 1);
+
+        // While we already have an Index Searcher, get a writer for the same 
index.
+        // This will cause the Index Searcher to be marked as poisoned.
+        final IndexWriter writer2 = manager.borrowIndexWriter(indexDir);
+
+        // Obtain a new Index Searcher with the writer open. This Index 
Searcher should *NOT*
+        // be the same as the previous searcher because the new one will be a 
Near-Real-Time Index Searcher
+        // while the other is not.
+        final IndexSearcher nrtSearcher = 
manager.borrowIndexSearcher(indexDir);
+        assertNotSame(cachedSearcher, nrtSearcher);
+
+        // Ensure that we get the expected query results.
+        assertCount(nrtSearcher, 1);
+
+        // Return the writer, so that there is no longer an active writer for 
the index.
+        manager.returnIndexWriter(indexDir, writer2);
+
+        // Ensure that we still get the same result.
+        assertCount(cachedSearcher, 1);
+        manager.returnIndexSearcher(indexDir, cachedSearcher);
+
+        // Ensure that our near-real-time index searcher still gets the same 
result.
+        assertCount(nrtSearcher, 1);
+        manager.returnIndexSearcher(indexDir, nrtSearcher);
+    }
+
+    private void assertCount(final IndexSearcher searcher, final int count) 
throws IOException {
+        final BooleanQuery query = new BooleanQuery();
+        query.add(new BooleanClause(new TermQuery(new Term("unit test", 
"true")), Occur.MUST));
+        final TopDocs topDocs = searcher.search(query, count * 10);
+        assertNotNull(topDocs);
+        assertEquals(1, topDocs.totalHits);
+    }
+}

Reply via email to