Repository: nifi Updated Branches: refs/heads/master a2d3d0c28 -> 95b5877f5
NIFI-2600: Ensure that we do not close Index Searchers prematurely, even if they are poisoned This closes #896 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/95b5877f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/95b5877f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/95b5877f Branch: refs/heads/master Commit: 95b5877f5de679da689b76917d93b3ad7fc9b890 Parents: a2d3d0c Author: Mark Payne <[email protected]> Authored: Thu Aug 18 15:29:34 2016 -0400 Committer: Matt Burgess <[email protected]> Committed: Mon Aug 22 12:03:47 2016 -0400 ---------------------------------------------------------------------- .../PersistentProvenanceRepository.java | 23 ++-- .../nifi/provenance/lucene/IndexManager.java | 68 ++++------- .../TestPersistentProvenanceRepository.java | 84 +++++++------- .../provenance/lucene/TestIndexManager.java | 114 +++++++++++++++++++ 4 files changed, 194 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/95b5877f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 2d0bbcb..da3a6f5 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -1241,6 +1241,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } } + protected long getRolloverRetryMillis() { + return 10000L; + } + /** * <p> * MUST be called with the write lock held. @@ -1349,14 +1353,14 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { future.cancel(false); } else { - logger.warn("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); + logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); } } }; // We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we // fail for some reason. When we succeed or if retries are exceeded, the Runnable will cancel itself. - future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS); + future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, getRolloverRetryMillis(), TimeUnit.MILLISECONDS); futureReference.set(future); } @@ -1499,6 +1503,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { return mergedFile; } + protected List<File> filterUnavailableFiles(final List<File> journalFiles) { + return journalFiles.stream().filter(file -> file.exists()).collect(Collectors.toList()); + } + /** * <p> * Merges all of the given Journal Files into a single, merged Provenance @@ -1555,9 +1563,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } }); - //Search for any missing files. At this point they should have been written to disk otherwise cannot continue - //missing files is most likely due to incomplete cleanup of files post merge - final long numAvailableFiles = journalFiles.size() - journalFiles.stream().filter(file -> !file.exists()).count(); + // Search for any missing files. At this point they should have been written to disk otherwise cannot continue. + // Missing files is most likely due to incomplete cleanup of files post merge + final List<File> availableFiles = filterUnavailableFiles(journalFiles); + final int numAvailableFiles = availableFiles.size(); // check if we have all of the "partial" files for the journal. if (numAvailableFiles > 0) { @@ -1606,7 +1615,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { final File writerFile = isCompress ? new File(suggestedMergeFile.getParentFile(), suggestedMergeFile.getName() + ".gz") : suggestedMergeFile; try { - for (final File journalFile : journalFiles) { + for (final File journalFile : availableFiles) { try { // Use MAX_VALUE for number of chars because we don't want to truncate the value as we write it // out. This allows us to later decide that we want more characters and still be able to retrieve @@ -1842,7 +1851,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } // Success. Remove all of the journal files, as they're no longer needed, now that they've been merged. - for (final File journalFile : journalFiles) { + for (final File journalFile : availableFiles) { if (!journalFile.delete() && journalFile.exists()) { logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/nifi/blob/95b5877f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index 6995173..3d38cac 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -187,47 +187,26 @@ public class IndexManager implements Closeable { } else { // keep track of any searchers that have been closed so that we can remove them // from our cache later. - final List<ActiveIndexSearcher> expired = new ArrayList<>(); - - try { - for ( final ActiveIndexSearcher searcher : currentlyCached ) { - if ( searcher.isCache() ) { - // if the searcher is poisoned, we want to close and expire it. - if ( searcher.isPoisoned() ) { - logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile); - expired.add(searcher); - continue; - } - - // if there are no references to the reader, it will have been closed. Since there is no - // isClosed() method, this is how we determine whether it's been closed or not. - final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); - if ( refCount <= 0 ) { - // if refCount == 0, then the reader has been closed, so we need to discard the searcher - logger.debug("Reference count for cached Index Searcher for {} is currently {}; " - + "removing cached searcher", absoluteFile, refCount); - expired.add(searcher); - continue; - } - - final int referenceCount = searcher.incrementReferenceCount(); - logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount); - return searcher.getSearcher(); + for (final ActiveIndexSearcher searcher : currentlyCached) { + if (searcher.isCache()) { + // if the searcher is poisoned, we want to close and expire it. + if (searcher.isPoisoned()) { + continue; } - } - } finally { - // if we have any expired index searchers, we need to close them and remove them - // from the cache so that we don't try to use them again later. - for ( final ActiveIndexSearcher searcher : expired ) { - try { - logger.debug("Closing {}", searcher); - searcher.close(); - logger.trace("Closed {}", searcher); - } catch (final Exception e) { - logger.debug("Failed to close 'expired' IndexSearcher {}", searcher); + + // if there are no references to the reader, it will have been closed. Since there is no + // isClosed() method, this is how we determine whether it's been closed or not. + final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); + if (refCount <= 0) { + // if refCount == 0, then the reader has been closed, so we cannot use the searcher + logger.debug("Reference count for cached Index Searcher for {} is currently {}; " + + "removing cached searcher", absoluteFile, refCount); + continue; } - currentlyCached.remove(searcher); + final int referenceCount = searcher.incrementReferenceCount(); + logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount); + return searcher.getSearcher(); } } } @@ -312,16 +291,14 @@ public class IndexManager implements Closeable { if ( activeSearcher.getSearcher().equals(searcher) ) { activeSearcherFound = true; if ( activeSearcher.isCache() ) { - // if the searcher is poisoned, close it and remove from "pool". + // if the searcher is poisoned, close it and remove from "pool". Otherwise, + // just decrement the count. Note here that when we call close() it won't actually close + // the underlying directory reader unless there are no more references to it if ( activeSearcher.isPoisoned() ) { itr.remove(); try { - logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory); - final boolean allReferencesClosed = activeSearcher.close(); - if (!allReferencesClosed) { - currentlyCached.add(activeSearcher); - } + activeSearcher.close(); } catch (final IOException ioe) { logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); if ( logger.isDebugEnabled() ) { @@ -384,7 +361,8 @@ public class IndexManager implements Closeable { } if (!activeSearcherFound) { - logger.error("Index Searcher {} was returned for {} but found no Active Searcher for it", searcher, indexDirectory); + logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. " + + "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory); } } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/nifi/blob/95b5877f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index ff12691..845d6ea 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -16,6 +16,34 @@ */ package org.apache.nifi.provenance; +import static org.apache.nifi.provenance.TestUtil.createFlowFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; + import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.core.SimpleAnalyzer; import org.apache.lucene.document.Document; @@ -63,35 +91,6 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileFilter; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.zip.GZIPOutputStream; - -import static org.apache.nifi.provenance.TestUtil.createFlowFile; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - public class TestPersistentProvenanceRepository { @Rule @@ -1681,6 +1680,17 @@ public class TestPersistentProvenanceRepository { retryAmount.incrementAndGet(); return super.mergeJournals(journalFiles, suggestedMergeFile, eventReporter); } + + // Indicate that there are no files available. + @Override + protected List<File> filterUnavailableFiles(List<File> journalFiles) { + return Collections.emptyList(); + } + + @Override + protected long getRolloverRetryMillis() { + return 10L; // retry quickly. + } }; repo.initialize(getEventReporter(), null, null); @@ -1706,23 +1716,11 @@ public class TestPersistentProvenanceRepository { } }); } - - final File storageDir = config.getStorageDirectories().get(0); - //trigger retry through full file deletion - Arrays.asList(storageDir.listFiles()) - .stream() - .map(file -> new File(storageDir, "journals")) - .map(journalDir -> Arrays.asList(journalDir.listFiles())) - .flatMap(partials -> partials.stream()) - .filter(partial -> partial.exists()) - .forEach(file -> { - file.delete(); - }); + exec.shutdown(); + exec.awaitTermination(10, TimeUnit.SECONDS); repo.waitForRollover(); - assertEquals(5,retryAmount.get()); - } @Test http://git-wip-us.apache.org/repos/asf/nifi/blob/95b5877f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java new file mode 100644 index 0000000..afe4512 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.lucene; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.UUID; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.nifi.util.file.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestIndexManager { + + private File indexDir; + private IndexManager manager; + + @Before + public void setup() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); + manager = new IndexManager(); + + indexDir = new File("target/testIndexManager/" + UUID.randomUUID().toString()); + indexDir.mkdirs(); + } + + @After + public void cleanup() throws IOException { + manager.close(); + + FileUtils.deleteFiles(Collections.singleton(indexDir), true); + } + + @Test + public void test() throws IOException { + // Create and IndexWriter and add a document to the index, then close the writer. + // This gives us something that we can query. + final IndexWriter writer = manager.borrowIndexWriter(indexDir); + final Document doc = new Document(); + doc.add(new StringField("unit test", "true", Store.YES)); + writer.addDocument(doc); + manager.returnIndexWriter(indexDir, writer); + + // Get an Index Searcher that we can use to query the index. + final IndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir); + + // Ensure that we get the expected results. + assertCount(cachedSearcher, 1); + + // While we already have an Index Searcher, get a writer for the same index. + // This will cause the Index Searcher to be marked as poisoned. + final IndexWriter writer2 = manager.borrowIndexWriter(indexDir); + + // Obtain a new Index Searcher with the writer open. This Index Searcher should *NOT* + // be the same as the previous searcher because the new one will be a Near-Real-Time Index Searcher + // while the other is not. + final IndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir); + assertNotSame(cachedSearcher, nrtSearcher); + + // Ensure that we get the expected query results. + assertCount(nrtSearcher, 1); + + // Return the writer, so that there is no longer an active writer for the index. + manager.returnIndexWriter(indexDir, writer2); + + // Ensure that we still get the same result. + assertCount(cachedSearcher, 1); + manager.returnIndexSearcher(indexDir, cachedSearcher); + + // Ensure that our near-real-time index searcher still gets the same result. + assertCount(nrtSearcher, 1); + manager.returnIndexSearcher(indexDir, nrtSearcher); + } + + private void assertCount(final IndexSearcher searcher, final int count) throws IOException { + final BooleanQuery query = new BooleanQuery(); + query.add(new BooleanClause(new TermQuery(new Term("unit test", "true")), Occur.MUST)); + final TopDocs topDocs = searcher.search(query, count * 10); + assertNotNull(topDocs); + assertEquals(1, topDocs.totalHits); + } +}
