NIFI-523: Do not read all lucene documents when we dont need to
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a06c2537 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a06c2537 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a06c2537 Branch: refs/heads/NIFI-271 Commit: a06c25373fbd4103c2c9ba1ba0d75d94726700d2 Parents: 509933f Author: Mark Payne <marka...@hotmail.com> Authored: Fri Apr 17 09:13:57 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Fri Apr 17 09:13:57 2015 -0400 ---------------------------------------------------------------------- .../nifi/provenance/lucene/DocsReader.java | 20 +++++----- .../provenance/serialization/RecordReaders.java | 41 ++++++++++++++++++-- 2 files changed, 47 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a06c2537/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index af5fe50..6446a35 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -48,18 +48,22 @@ public class DocsReader { return Collections.emptySet(); } - final List<Document> docs = new ArrayList<>(); + final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults); + final List<Document> docs = new ArrayList<>(numDocs); - for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + for (final ScoreDoc scoreDoc : topDocs.scoreDocs) { final int docId = scoreDoc.doc; final Document d = indexReader.document(docId); docs.add(d); + if ( retrievalCount.incrementAndGet() >= maxResults ) { + break; + } } - return read(docs, allProvenanceLogFiles, retrievalCount, maxResults); + return read(docs, allProvenanceLogFiles); } - public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { + public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException { LuceneUtil.sortDocsForRetrieval(docs); RecordReader reader = null; @@ -79,9 +83,6 @@ public class DocsReader { reader.skipTo(byteOffset); final StandardProvenanceEventRecord record = reader.nextRecord(); matchingRecords.add(record); - if (retrievalCount.incrementAndGet() >= maxResults) { - break; - } } catch (final IOException e) { throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); } @@ -91,7 +92,7 @@ public class DocsReader { reader.close(); } - final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); + List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); if (potentialFiles.isEmpty()) { throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository"); } @@ -108,9 +109,6 @@ public class DocsReader { final StandardProvenanceEventRecord record = reader.nextRecord(); matchingRecords.add(record); - if (retrievalCount.incrementAndGet() >= maxResults) { - break; - } } catch (final IOException e) { throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a06c2537/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index f902b92..8f06995 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -33,6 +33,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil; public class RecordReaders { public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException { + final File originalFile = file; + if (!file.exists()) { if (provenanceLogFiles == null) { throw new FileNotFoundException(file.toString()); @@ -47,11 +49,44 @@ public class RecordReaders { } } - if (file == null || !file.exists()) { - throw new FileNotFoundException(file.toString()); + InputStream fis = null; + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + } catch (final FileNotFoundException fnfe) { + fis = null; + } + } + + openStream: while ( fis == null ) { + final File dir = file.getParentFile(); + final String baseName = LuceneUtil.substringBefore(file.getName(), "."); + + // depending on which rollover actions have occurred, we could have 3 possibilities for the + // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz" + // because most often we are compressing on rollover and most often we have already finished + // compressing by the time that we are querying the data. + for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) { + file = new File(dir, baseName + extension); + if ( file.exists() ) { + try { + fis = new FileInputStream(file); + break openStream; + } catch (final FileNotFoundException fnfe) { + // file was modified by a RolloverAction after we verified that it exists but before we could + // create an InputStream for it. Start over. + fis = null; + continue openStream; + } + } + } + + break; } - final InputStream fis = new FileInputStream(file); + if ( fis == null ) { + throw new FileNotFoundException("Unable to locate file " + originalFile); + } final InputStream readableStream; if (file.getName().endsWith(".gz")) { readableStream = new BufferedInputStream(new GZIPInputStream(fis));