http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 00f4617..48d8e09 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import java.io.DataOutputStream; import java.io.File; import java.io.FileFilter; import java.io.FileInputStream; @@ -65,6 +66,8 @@ import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.index.EventIndexWriter; import org.apache.nifi.provenance.lineage.EventNode; import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageEdge; @@ -83,7 +86,6 @@ import org.apache.nifi.provenance.serialization.RecordReaders; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.serialization.RecordWriters; import org.apache.nifi.reporting.Severity; -import org.apache.nifi.stream.io.DataOutputStream; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; import org.junit.After; @@ -120,7 +122,7 @@ public class TestPersistentProvenanceRepository { private static RepositoryConfiguration createConfiguration() { config = new RepositoryConfiguration(); - config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString())); + config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID().toString())); config.setCompressOnRollover(true); config.setMaxEventFileLife(2000L, TimeUnit.SECONDS); config.setCompressionBlockBytes(100); @@ -152,14 +154,15 @@ public class TestPersistentProvenanceRepository { final File tempRecordFile = tempFolder.newFile("record.tmp"); System.out.println("findJournalSizes position 0 = " + tempRecordFile.length()); - final RecordWriter writer = RecordWriters.newSchemaRecordWriter(tempRecordFile, false, false); + final AtomicLong idGenerator = new AtomicLong(0L); + final RecordWriter writer = RecordWriters.newSchemaRecordWriter(tempRecordFile, idGenerator, false, false); writer.writeHeader(12345L); writer.flush(); headerSize = Long.valueOf(tempRecordFile.length()).intValue(); - writer.writeRecord(record, 12345L); + writer.writeRecord(record); writer.flush(); recordSize = Long.valueOf(tempRecordFile.length()).intValue() - headerSize; - writer.writeRecord(record2, 23456L); + writer.writeRecord(record2); writer.flush(); recordSize2 = Long.valueOf(tempRecordFile.length()).intValue() - headerSize - recordSize; writer.close(); @@ -187,34 +190,45 @@ public class TestPersistentProvenanceRepository { @After public void closeRepo() throws IOException { - if (repo != null) { - try { - repo.close(); - } catch (final IOException ioe) { - } + if (repo == null) { + return; } + try { + repo.close(); + } catch (final IOException ioe) { + } + + // Delete all of the storage files. We do this in order to clean up the tons of files that + // we create but also to ensure that we have closed all of the file handles. If we leave any + // streams open, for instance, this will throw an IOException, causing our unit test to fail. if (config != null) { - // Delete all of the storage files. We do this in order to clean up the tons of files that - // we create but also to ensure that we have closed all of the file handles. If we leave any - // streams open, for instance, this will throw an IOException, causing our unit test to fail. - for (final File storageDir : config.getStorageDirectories()) { - if (storageDir.exists()) { - int i; - for (i = 0; i < 3; i++) { - try { - System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists()); - FileUtils.deleteFile(storageDir, true); - break; - } catch (final IOException ioe) { - // if there is a virus scanner, etc. running in the background we may not be able to - // delete the file. Wait a sec and try again. - if (i == 2) { - throw ioe; - } else { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { + for (final File storageDir : config.getStorageDirectories().values()) { + int i; + for (i = 0; i < 3; i++) { + try { + FileUtils.deleteFile(storageDir, true); + break; + } catch (final IOException ioe) { + // if there is a virus scanner, etc. running in the background we may not be able to + // delete the file. Wait a sec and try again. + if (i == 2) { + throw ioe; + } else { + try { + System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists()); + FileUtils.deleteFile(storageDir, true); + break; + } catch (final IOException ioe2) { + // if there is a virus scanner, etc. running in the background we may not be able to + // delete the file. Wait a sec and try again. + if (i == 2) { + throw ioe2; + } else { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } } } } @@ -240,7 +254,7 @@ public class TestPersistentProvenanceRepository { config.setJournalCount(10); config.setQueryThreadPoolSize(10); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final Map<String, String> attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -288,7 +302,7 @@ public class TestPersistentProvenanceRepository { System.out.println("Closing and re-initializing"); repo.close(); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); System.out.println("Re-initialized"); final long fetchStart = System.nanoTime(); @@ -311,7 +325,7 @@ public class TestPersistentProvenanceRepository { return "2000 millis"; } else if (key.equals(NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + ".default")) { createConfiguration(); - return config.getStorageDirectories().get(0).getAbsolutePath(); + return config.getStorageDirectories().values().iterator().next().getAbsolutePath(); } else { return null; } @@ -340,8 +354,8 @@ public class TestPersistentProvenanceRepository { @Test public void constructorConfig() throws IOException { - RepositoryConfiguration configuration = createTestableRepositoryConfiguration(properties); - TestablePersistentProvenanceRepository tppr = new TestablePersistentProvenanceRepository(configuration, 20000); + RepositoryConfiguration configuration = RepositoryConfiguration.create(properties); + new TestablePersistentProvenanceRepository(configuration, 20000); } @Test @@ -350,7 +364,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileCapacity(1L); config.setMaxEventFileLife(1, TimeUnit.SECONDS); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final Map<String, String> attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -376,7 +390,7 @@ public class TestPersistentProvenanceRepository { Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.) repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, 12); assertEquals(10, recoveredRecords.size()); @@ -399,7 +413,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(2, TimeUnit.SECONDS); config.setSearchableFields(searchableFields); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final Map<String, String> attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -454,7 +468,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); config.setSearchableAttributes(SearchableFieldParser.extractSearchableFields("immense", false)); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); int immenseAttrSize = 33000; // must be greater than 32766 for a meaningful test StringBuilder immenseBldr = new StringBuilder(immenseAttrSize); @@ -498,7 +512,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -542,7 +556,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setCompressOnRollover(true); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -565,7 +579,7 @@ public class TestPersistentProvenanceRepository { } repo.waitForRollover(); - final File storageDir = config.getStorageDirectories().get(0); + final File storageDir = config.getStorageDirectories().values().iterator().next(); final File compressedLogFile = new File(storageDir, "0.prov.gz"); assertTrue(compressedLogFile.exists()); } @@ -580,7 +594,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "10000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -653,8 +667,8 @@ public class TestPersistentProvenanceRepository { final AtomicInteger indexSearcherCount = new AtomicInteger(0); @Override - public IndexSearcher borrowIndexSearcher(File indexDir) throws IOException { - final IndexSearcher searcher = mgr.borrowIndexSearcher(indexDir); + public EventIndexSearcher borrowIndexSearcher(File indexDir) throws IOException { + final EventIndexSearcher searcher = mgr.borrowIndexSearcher(indexDir); final int idx = indexSearcherCount.incrementAndGet(); obtainIndexSearcherLatch.countDown(); @@ -677,7 +691,7 @@ public class TestPersistentProvenanceRepository { } @Override - public IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException { + public EventIndexWriter borrowIndexWriter(File indexingDirectory) throws IOException { return mgr.borrowIndexWriter(indexingDirectory); } @@ -687,18 +701,19 @@ public class TestPersistentProvenanceRepository { } @Override - public void removeIndex(File indexDirectory) { + public boolean removeIndex(File indexDirectory) { mgr.removeIndex(indexDirectory); + return true; } @Override - public void returnIndexSearcher(File indexDirectory, IndexSearcher searcher) { - mgr.returnIndexSearcher(indexDirectory, searcher); + public void returnIndexSearcher(EventIndexSearcher searcher) { + mgr.returnIndexSearcher(searcher); } @Override - public void returnIndexWriter(File indexingDirectory, IndexWriter writer) { - mgr.returnIndexWriter(indexingDirectory, writer); + public void returnIndexWriter(EventIndexWriter writer) { + mgr.returnIndexWriter(writer); } }; } @@ -707,7 +722,7 @@ public class TestPersistentProvenanceRepository { } }; - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "10000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -790,7 +805,7 @@ public class TestPersistentProvenanceRepository { @Test public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration(); - config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString())); + config.addStorageDirectory("2", new File("target/storage/" + UUID.randomUUID().toString())); config.setMaxRecordLife(30, TimeUnit.SECONDS); config.setMaxStorageCapacity(1024L * 1024L); config.setMaxEventFileLife(1, TimeUnit.SECONDS); @@ -798,7 +813,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -885,7 +900,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -941,7 +956,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000001"; final Map<String, String> attributes = new HashMap<>(); @@ -996,7 +1011,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000001"; final Map<String, String> attributes = new HashMap<>(); @@ -1055,7 +1070,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String childId = "00000000-0000-0000-0000-000000000000"; @@ -1105,7 +1120,7 @@ public class TestPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String childId = "00000000-0000-0000-0000-000000000000"; @@ -1152,7 +1167,7 @@ public class TestPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(1, TimeUnit.SECONDS); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1178,7 +1193,7 @@ public class TestPersistentProvenanceRepository { repo.close(); final PersistentProvenanceRepository secondRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - secondRepo.initialize(getEventReporter(), null, null); + secondRepo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); try { final ProvenanceEventRecord event11 = builder.build(); @@ -1239,7 +1254,7 @@ public class TestPersistentProvenanceRepository { config.setDesiredIndexSize(10); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); String uuid = UUID.randomUUID().toString(); for (int i = 0; i < 20; i++) { @@ -1253,7 +1268,7 @@ public class TestPersistentProvenanceRepository { } } repo.waitForRollover(); - File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz"); + File eventFile = new File(config.getStorageDirectories().values().iterator().next(), "10.prov.gz"); assertTrue(eventFile.delete()); return eventFile; } @@ -1270,7 +1285,7 @@ public class TestPersistentProvenanceRepository { config.setDesiredIndexSize(10); // force new index to be created for each rollover repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1298,7 +1313,7 @@ public class TestPersistentProvenanceRepository { Thread.sleep(2000L); final FileFilter indexFileFilter = file -> file.getName().startsWith("index"); - final int numIndexDirs = config.getStorageDirectories().get(0).listFiles(indexFileFilter).length; + final int numIndexDirs = config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter).length; assertEquals(1, numIndexDirs); // add more records so that we will create a new index @@ -1324,7 +1339,7 @@ public class TestPersistentProvenanceRepository { assertEquals(20, result.getMatchingEvents().size()); // Ensure index directories exists - File[] indexDirs = config.getStorageDirectories().get(0).listFiles(indexFileFilter); + File[] indexDirs = config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter); assertEquals(2, indexDirs.length); // expire old events and indexes @@ -1337,7 +1352,7 @@ public class TestPersistentProvenanceRepository { assertEquals(10, newRecordSet.getMatchingEvents().size()); // Ensure that one index directory is gone - indexDirs = config.getStorageDirectories().get(0).listFiles(indexFileFilter); + indexDirs = config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter); assertEquals(1, indexDirs.length); } @@ -1354,12 +1369,12 @@ public class TestPersistentProvenanceRepository { final AccessDeniedException expectedException = new AccessDeniedException("Unit Test - Intentionally Thrown"); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { @Override - protected void authorize(ProvenanceEventRecord event, NiFiUser user) { + public void authorize(ProvenanceEventRecord event, NiFiUser user) { throw expectedException; } }; - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1409,7 +1424,7 @@ public class TestPersistentProvenanceRepository { } }; - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1461,7 +1476,7 @@ public class TestPersistentProvenanceRepository { } }; - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1522,7 +1537,7 @@ public class TestPersistentProvenanceRepository { } }; - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1641,7 +1656,7 @@ public class TestPersistentProvenanceRepository { return journalCountRef.get(); } }; - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final Map<String, String> attributes = new HashMap<>(); final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); @@ -1697,7 +1712,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -1732,7 +1747,7 @@ public class TestPersistentProvenanceRepository { final List<File> indexDirs = indexConfig.getIndexDirectories(); final String query = "uuid:00000000-0000-0000-0000-0000000000* AND NOT filename:file-?"; - final List<Document> results = runQuery(indexDirs.get(0), config.getStorageDirectories(), query); + final List<Document> results = runQuery(indexDirs.get(0), new ArrayList<>(config.getStorageDirectories().values()), query); assertEquals(6, results.size()); } @@ -1786,7 +1801,7 @@ public class TestPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(3, TimeUnit.SECONDS); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final Map<String, String> attributes = new HashMap<>(); @@ -1813,7 +1828,7 @@ public class TestPersistentProvenanceRepository { repo.waitForRollover(); - final File storageDir = config.getStorageDirectories().get(0); + final File storageDir = config.getStorageDirectories().values().iterator().next(); long counter = 0; for (final File file : storageDir.listFiles()) { if (file.isFile()) { @@ -1853,7 +1868,7 @@ public class TestPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(3, TimeUnit.SECONDS); TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - testRepo.initialize(getEventReporter(), null, null); + testRepo.initialize(getEventReporter(), null, null, null); final Map<String, String> attributes = new HashMap<>(); @@ -1897,7 +1912,7 @@ public class TestPersistentProvenanceRepository { + "that the record wasn't completely written to the file. This journal will be skipped.", reportedEvents.get(reportedEvents.size() - 1).getMessage()); - final File storageDir = config.getStorageDirectories().get(0); + final File storageDir = config.getStorageDirectories().values().iterator().next(); assertTrue(checkJournalRecords(storageDir, false) < 10000); } @@ -1906,7 +1921,7 @@ public class TestPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(3, TimeUnit.SECONDS); TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - testRepo.initialize(getEventReporter(), null, null); + testRepo.initialize(getEventReporter(), null, null, null); final Map<String, String> attributes = new HashMap<>(); @@ -1951,7 +1966,7 @@ public class TestPersistentProvenanceRepository { + "be skipped.", reportedEvents.get(reportedEvents.size() - 1).getMessage()); - final File storageDir = config.getStorageDirectories().get(0); + final File storageDir = config.getStorageDirectories().values().iterator().next(); assertTrue(checkJournalRecords(storageDir, false) < 10000); } @@ -1960,7 +1975,7 @@ public class TestPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(3, TimeUnit.SECONDS); TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - testRepo.initialize(getEventReporter(), null, null); + testRepo.initialize(getEventReporter(), null, null, null); final Map<String, String> attributes = new HashMap<>(); @@ -1997,7 +2012,7 @@ public class TestPersistentProvenanceRepository { assertEquals("mergeJournals() should not error on empty journal", 0, reportedEvents.size()); - final File storageDir = config.getStorageDirectories().get(0); + final File storageDir = config.getStorageDirectories().values().iterator().next(); assertEquals(config.getJournalCount() - 1, checkJournalRecords(storageDir, true)); } @@ -2025,7 +2040,7 @@ public class TestPersistentProvenanceRepository { return 10L; // retry quickly. } }; - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final Map<String, String> attributes = new HashMap<>(); @@ -2062,7 +2077,7 @@ public class TestPersistentProvenanceRepository { config.setMaxAttributeChars(50); config.setMaxEventFileLife(3, TimeUnit.SECONDS); repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final String maxLengthChars = "12345678901234567890123456789012345678901234567890"; final Map<String, String> attributes = new HashMap<>(); @@ -2108,7 +2123,7 @@ public class TestPersistentProvenanceRepository { repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { @Override protected synchronized IndexingAction createIndexingAction() { - return new IndexingAction(repo) { + return new IndexingAction(config.getSearchableFields(), config.getSearchableAttributes()) { @Override public void index(StandardProvenanceEventRecord record, IndexWriter indexWriter, Integer blockIndex) throws IOException { final int count = indexedEventCount.incrementAndGet(); @@ -2121,7 +2136,7 @@ public class TestPersistentProvenanceRepository { }; } }; - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); final Map<String, String> attributes = new HashMap<>(); attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); @@ -2169,7 +2184,7 @@ public class TestPersistentProvenanceRepository { }; // initialize with our event reporter - repo.initialize(getEventReporter(), null, null); + repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY); // create some events in the journal files. final Map<String, String> attributes = new HashMap<>(); @@ -2219,10 +2234,12 @@ public class TestPersistentProvenanceRepository { this.message = message; } + @SuppressWarnings("unused") public String getCategory() { return category; } + @SuppressWarnings("unused") public String getMessage() { return message; }
http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java index aed690b..2eb353e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -34,11 +35,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import org.apache.nifi.provenance.schema.EventFieldNames; import org.apache.nifi.provenance.schema.EventRecord; -import org.apache.nifi.provenance.schema.EventRecordFields; import org.apache.nifi.provenance.schema.ProvenanceEventSchema; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordWriter; @@ -55,14 +58,14 @@ import org.apache.nifi.repository.schema.RecordField; import org.apache.nifi.repository.schema.RecordSchema; import org.apache.nifi.repository.schema.Repetition; import org.apache.nifi.repository.schema.SimpleRecordField; -import org.apache.nifi.stream.io.DataOutputStream; import org.apache.nifi.stream.io.NullOutputStream; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter { - + private final AtomicLong idGenerator = new AtomicLong(0L); private File journalFile; private File tocFile; @@ -70,6 +73,58 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter public void setup() { journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testFieldAddedToSchema"); tocFile = TocUtil.getTocFile(journalFile); + idGenerator.set(0L); + } + + + @Test + @Ignore("runs forever for performance analysis/profiling") + public void testPerformanceOfRandomAccessReads() throws Exception { + journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testPerformanceOfRandomAccessReads.gz"); + tocFile = TocUtil.getTocFile(journalFile); + + try (final RecordWriter writer = createWriter(journalFile, new StandardTocWriter(tocFile, true, false), true, 1024 * 32)) { + writer.writeHeader(0L); + + for (int i = 0; i < 100_000; i++) { + writer.writeRecord(createEvent()); + } + } + + final long[] eventIds = new long[] { + 4, 80, 1024, 1025, 1026, 1027, 1028, 1029, 1030, 40_000, 80_000, 99_000 + }; + + boolean loopForever = true; + while (loopForever) { + final long start = System.nanoTime(); + for (int i = 0; i < 1000; i++) { + try (final InputStream in = new FileInputStream(journalFile); + final RecordReader reader = createReader(in, journalFile.getName(), new StandardTocReader(tocFile), 32 * 1024)) { + + for (final long id : eventIds) { + time(() -> { + reader.skipToEvent(id); + return reader.nextRecord(); + }, id); + } + } + } + + final long ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + System.out.println(ms + " ms total"); + } + } + + private void time(final Callable<StandardProvenanceEventRecord> task, final long id) throws Exception { + final long start = System.nanoTime(); + final StandardProvenanceEventRecord event = task.call(); + Assert.assertNotNull(event); + Assert.assertEquals(id, event.getEventId()); + // System.out.println(event); + final long nanos = System.nanoTime() - start; + final long millis = TimeUnit.NANOSECONDS.toMillis(nanos); + // System.out.println("Took " + millis + " ms to " + taskDescription); } @@ -83,8 +138,8 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) { writer.writeHeader(1L); - writer.writeRecord(createEvent(), 3L); - writer.writeRecord(createEvent(), 3L); + writer.writeRecord(createEvent()); + writer.writeRecord(createEvent()); } try (final InputStream in = new FileInputStream(journalFile); @@ -94,7 +149,6 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter for (int i = 0; i < 2; i++) { final StandardProvenanceEventRecord event = reader.nextRecord(); assertNotNull(event); - assertEquals(3L, event.getEventId()); assertEquals("1234", event.getComponentId()); assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); @@ -111,14 +165,14 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter // Create a schema that has the fields modified final RecordSchema schemaV1 = ProvenanceEventSchema.PROVENANCE_EVENT_SCHEMA_V1; final List<RecordField> fields = new ArrayList<>(schemaV1.getFields()); - fields.remove(new SimpleRecordField(EventRecordFields.Names.UPDATED_ATTRIBUTES, FieldType.STRING, Repetition.EXACTLY_ONE)); - fields.remove(new SimpleRecordField(EventRecordFields.Names.PREVIOUS_ATTRIBUTES, FieldType.STRING, Repetition.EXACTLY_ONE)); + fields.remove(new SimpleRecordField(EventFieldNames.UPDATED_ATTRIBUTES, FieldType.STRING, Repetition.EXACTLY_ONE)); + fields.remove(new SimpleRecordField(EventFieldNames.PREVIOUS_ATTRIBUTES, FieldType.STRING, Repetition.EXACTLY_ONE)); final RecordSchema recordSchema = new RecordSchema(fields); // Create a record writer whose schema does not contain updated attributes or previous attributes. // This means that we must also override the method that writes out attributes so that we are able // to avoid actually writing them out. - final ByteArraySchemaRecordWriter writer = new ByteArraySchemaRecordWriter(journalFile, tocWriter, false, 0) { + final ByteArraySchemaRecordWriter writer = new ByteArraySchemaRecordWriter(journalFile, idGenerator, tocWriter, false, 0) { @Override public void writeHeader(long firstEventId, DataOutputStream out) throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -130,15 +184,15 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter @Override protected Record createRecord(final ProvenanceEventRecord event, final long eventId) { - final RecordSchema contentClaimSchema = new RecordSchema(recordSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields()); + final RecordSchema contentClaimSchema = new RecordSchema(recordSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields()); return new EventRecord(event, eventId, recordSchema, contentClaimSchema); } }; try { writer.writeHeader(1L); - writer.writeRecord(createEvent(), 3L); - writer.writeRecord(createEvent(), 3L); + writer.writeRecord(createEvent()); + writer.writeRecord(createEvent()); } finally { writer.close(); } @@ -154,7 +208,6 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter for (int i = 0; i < 2; i++) { final StandardProvenanceEventRecord event = reader.nextRecord(); assertNotNull(event); - assertEquals(3L, event.getEventId()); assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); // We will still have a Map<String, String> for updated attributes because the @@ -175,7 +228,7 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) { writer.writeHeader(1L); - writer.writeRecord(createEvent(), 3L); + writer.writeRecord(createEvent()); } try (final InputStream in = new FileInputStream(journalFile); @@ -207,9 +260,9 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter fieldModifier.accept(fields); final RecordSchema recordSchema = new RecordSchema(fields); - final RecordSchema contentClaimSchema = new RecordSchema(recordSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields()); + final RecordSchema contentClaimSchema = new RecordSchema(recordSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields()); - final ByteArraySchemaRecordWriter writer = new ByteArraySchemaRecordWriter(journalFile, tocWriter, false, 0) { + final ByteArraySchemaRecordWriter writer = new ByteArraySchemaRecordWriter(journalFile, idGenerator, tocWriter, false, 0) { @Override public void writeHeader(long firstEventId, DataOutputStream out) throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -250,14 +303,13 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter final int numEvents = 10_000_000; final long startNanos = System.nanoTime(); try (final OutputStream nullOut = new NullOutputStream(); - final RecordWriter writer = new ByteArraySchemaRecordWriter(nullOut, tocWriter, false, 0)) { + final RecordWriter writer = new ByteArraySchemaRecordWriter(nullOut, "out", idGenerator, tocWriter, false, 0)) { writer.writeHeader(0L); for (int i = 0; i < numEvents; i++) { - writer.writeRecord(event, i); + writer.writeRecord(event); } - } final long nanos = System.nanoTime() - startNanos; @@ -280,7 +332,7 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(headerOut)) { - final RecordWriter schemaWriter = new ByteArraySchemaRecordWriter(out, null, false, 0); + final RecordWriter schemaWriter = new ByteArraySchemaRecordWriter(out, "out", idGenerator, null, false, 0); schemaWriter.writeHeader(1L); header = headerOut.toByteArray(); @@ -288,12 +340,12 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter final byte[] serializedRecord; try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream(); - final RecordWriter writer = new ByteArraySchemaRecordWriter(headerOut, null, false, 0)) { + final RecordWriter writer = new ByteArraySchemaRecordWriter(headerOut, "out", idGenerator, null, false, 0)) { writer.writeHeader(1L); headerOut.reset(); - writer.writeRecord(event, 1L); + writer.writeRecord(event); writer.flush(); serializedRecord = headerOut.toByteArray(); } @@ -322,7 +374,7 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter @Override protected RecordWriter createWriter(File file, TocWriter tocWriter, boolean compressed, int uncompressedBlockSize) throws IOException { - return new ByteArraySchemaRecordWriter(file, tocWriter, compressed, uncompressedBlockSize); + return new ByteArraySchemaRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize); } @@ -331,11 +383,4 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter final ByteArraySchemaRecordReader reader = new ByteArraySchemaRecordReader(in, journalFilename, tocReader, maxAttributeSize); return reader; } - - private static interface WriteRecordInterceptor { - void writeRawRecord(ProvenanceEventRecord event, long recordIdentifier, DataOutputStream out) throws IOException; - } - - private static WriteRecordInterceptor NOP_INTERCEPTOR = (event, id, out) -> {}; - private static WriteRecordInterceptor WRITE_DUMMY_STRING_INTERCEPTOR = (event, id, out) -> out.writeUTF("hello"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index dfa37e4..27002c8 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -16,7 +16,11 @@ */ package org.apache.nifi.provenance; +import static org.apache.nifi.provenance.TestUtil.createFlowFile; +import static org.junit.Assert.assertTrue; + import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -25,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.provenance.serialization.RecordReader; @@ -32,16 +37,18 @@ import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.toc.NopTocWriter; import org.apache.nifi.provenance.toc.TocReader; import org.apache.nifi.provenance.toc.TocWriter; -import org.apache.nifi.stream.io.DataOutputStream; import org.apache.nifi.stream.io.NullOutputStream; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import static org.apache.nifi.provenance.TestUtil.createFlowFile; -import static org.junit.Assert.assertTrue; - public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWriter { + private AtomicLong idGenerator = new AtomicLong(0L); + @Before + public void resetIds() { + idGenerator.set(0L); + } @Test @Ignore("For local testing only") @@ -56,12 +63,12 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit final int numEvents = 10_000_000; final long startNanos = System.nanoTime(); try (final OutputStream nullOut = new NullOutputStream(); - final RecordWriter writer = new StandardRecordWriter(nullOut, tocWriter, false, 100000)) { + final RecordWriter writer = new StandardRecordWriter(nullOut, "devnull", idGenerator, tocWriter, false, 100000)) { writer.writeHeader(0L); for (int i = 0; i < numEvents; i++) { - writer.writeRecord(event, i); + writer.writeRecord(event); } } @@ -90,12 +97,12 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit final byte[] serializedRecord; try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream(); - final StandardRecordWriter writer = new StandardRecordWriter(headerOut, null, false, 0)) { + final StandardRecordWriter writer = new StandardRecordWriter(headerOut, "devnull", idGenerator, null, false, 0)) { writer.writeHeader(1L); headerOut.reset(); - writer.writeRecord(event, 1L); + writer.writeRecord(event); writer.flush(); serializedRecord = headerOut.toByteArray(); } @@ -142,18 +149,18 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit } try (final ByteArrayOutputStream recordOut = new ByteArrayOutputStream(); - final StandardRecordWriter writer = new StandardRecordWriter(recordOut, null, false, 0)) { + final StandardRecordWriter writer = new StandardRecordWriter(recordOut, "devnull", idGenerator, null, false, 0)) { writer.writeHeader(1L); recordOut.reset(); - writer.writeRecord(record, 1L); + writer.writeRecord(record); } } @Override protected RecordWriter createWriter(File file, TocWriter tocWriter, boolean compressed, int uncompressedBlockSize) throws IOException { - return new StandardRecordWriter(file, tocWriter, compressed, uncompressedBlockSize); + return new StandardRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java index 514a43e..224ee71 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java @@ -18,6 +18,7 @@ package org.apache.nifi.provenance; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.nifi.flowfile.FlowFile; @@ -82,4 +83,22 @@ public class TestUtil { } }; } + + public static ProvenanceEventRecord createEvent() { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "1.txt"); + attributes.put("uuid", UUID.randomUUID().toString()); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + final ProvenanceEventRecord record = builder.build(); + + return record; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestEventIndexTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestEventIndexTask.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestEventIndexTask.java new file mode 100644 index 0000000..4c58b13 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestEventIndexTask.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.LongField; +import org.apache.lucene.index.IndexWriter; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.index.EventIndexWriter; +import org.apache.nifi.provenance.lucene.IndexManager; +import org.apache.nifi.provenance.lucene.LuceneEventIndexWriter; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestEventIndexTask { + + @BeforeClass + public static void setupClass() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG"); + } + + @Test(timeout = 5000) + public void testIndexWriterCommittedWhenAppropriate() throws IOException, InterruptedException { + final BlockingQueue<StoredDocument> docQueue = new LinkedBlockingQueue<>(); + final RepositoryConfiguration repoConfig = new RepositoryConfiguration(); + final File storageDir = new File("target/storage/TestEventIndexTask/1"); + repoConfig.addStorageDirectory("1", storageDir); + + final AtomicInteger commitCount = new AtomicInteger(0); + + // Mock out an IndexWriter and keep track of the number of events that are indexed. + final IndexWriter indexWriter = Mockito.mock(IndexWriter.class); + final EventIndexWriter eventIndexWriter = new LuceneEventIndexWriter(indexWriter, storageDir); + + final IndexManager indexManager = Mockito.mock(IndexManager.class); + Mockito.when(indexManager.borrowIndexWriter(Mockito.any(File.class))).thenReturn(eventIndexWriter); + + final IndexDirectoryManager directoryManager = new IndexDirectoryManager(repoConfig); + + // Create an EventIndexTask and override the commit(IndexWriter) method so that we can keep track of how + // many times the index writer gets committed. + final EventIndexTask task = new EventIndexTask(docQueue, repoConfig, indexManager, directoryManager, 201, EventReporter.NO_OP) { + @Override + protected void commit(EventIndexWriter indexWriter) throws IOException { + commitCount.incrementAndGet(); + } + }; + + // Create 4 threads, each one a daemon thread running the EventIndexTask + for (int i = 0; i < 4; i++) { + final Thread t = new Thread(task); + t.setDaemon(true); + t.start(); + } + + assertEquals(0, commitCount.get()); + + // Index 100 documents with a storage filename of "0.0.prov" + for (int i = 0; i < 100; i++) { + final Document document = new Document(); + document.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis(), Store.NO)); + + final StorageSummary location = new StorageSummary(1L, "0.0.prov", "1", 0, 1000L, 1000L); + final StoredDocument storedDoc = new StoredDocument(document, location); + docQueue.add(storedDoc); + } + assertEquals(0, commitCount.get()); + + // Index 100 documents + for (int i = 0; i < 100; i++) { + final Document document = new Document(); + document.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis(), Store.NO)); + + final StorageSummary location = new StorageSummary(1L, "0.0.prov", "1", 0, 1000L, 1000L); + final StoredDocument storedDoc = new StoredDocument(document, location); + docQueue.add(storedDoc); + } + + // Wait until we've indexed all 200 events + while (eventIndexWriter.getEventsIndexed() < 200) { + Thread.sleep(10L); + } + + // Wait a bit and make sure that we still haven't committed the index writer. + Thread.sleep(100L); + assertEquals(0, commitCount.get()); + + // Add another document. + final Document document = new Document(); + document.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), System.currentTimeMillis(), Store.NO)); + final StorageSummary location = new StorageSummary(1L, "0.0.prov", "1", 0, 1000L, 1000L); + + StoredDocument storedDoc = new StoredDocument(document, location); + docQueue.add(storedDoc); + + // Wait until index writer is committed. + while (commitCount.get() == 0) { + Thread.sleep(10L); + } + assertEquals(1, commitCount.get()); + + // Add a new IndexableDocument with a count of 1 to ensure that the writer is committed again. + storedDoc = new StoredDocument(document, location); + docQueue.add(storedDoc); + Thread.sleep(100L); + assertEquals(1, commitCount.get()); + + // Add a new IndexableDocument with a count of 3. Index writer should not be committed again. + storedDoc = new StoredDocument(document, location); + docQueue.add(storedDoc); + Thread.sleep(100L); + assertEquals(1, commitCount.get()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java new file mode 100644 index 0000000..3f3c422 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.junit.Test; + +public class TestIndexDirectoryManager { + + @Test + public void testGetDirectoriesIncludesMatchingTimestampPlusOne() { + final List<IndexLocation> locations = new ArrayList<>(); + locations.add(createLocation(999L)); + locations.add(createLocation(1002L)); + locations.add(createLocation(1005L)); + + final List<File> directories = IndexDirectoryManager.getDirectories(1000L, 1001L, locations); + assertEquals(2, directories.size()); + assertTrue(directories.contains(new File("index-999"))); + assertTrue(directories.contains(new File("index-1002"))); + } + + @Test + public void testGetDirectoriesOnlyObtainsDirectoriesForDesiredPartition() { + final RepositoryConfiguration config = createConfig(2); + + final File storageDir1 = config.getStorageDirectories().get("1"); + final File storageDir2 = config.getStorageDirectories().get("2"); + + final File index1 = new File(storageDir1, "index-1"); + final File index2 = new File(storageDir1, "index-2"); + final File index3 = new File(storageDir2, "index-3"); + final File index4 = new File(storageDir2, "index-4"); + + final File[] allIndices = new File[] {index1, index2, index3, index4}; + for (final File file : allIndices) { + assertTrue(file.mkdirs() || file.exists()); + } + + try { + final IndexDirectoryManager mgr = new IndexDirectoryManager(config); + mgr.initialize(); + + final List<File> indexes1 = mgr.getDirectories(0L, Long.MAX_VALUE, "1"); + final List<File> indexes2 = mgr.getDirectories(0L, Long.MAX_VALUE, "2"); + + assertEquals(2, indexes1.size()); + assertTrue(indexes1.contains(index1)); + assertTrue(indexes1.contains(index2)); + + assertEquals(2, indexes2.size()); + assertTrue(indexes2.contains(index3)); + assertTrue(indexes2.contains(index4)); + } finally { + for (final File file : allIndices) { + file.delete(); + } + } + } + + + private IndexLocation createLocation(final long timestamp) { + return createLocation(timestamp, "1"); + } + + private IndexLocation createLocation(final long timestamp, final String partitionName) { + return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName, 1024 * 1024L); + } + + private RepositoryConfiguration createConfig(final int partitions) { + final RepositoryConfiguration repoConfig = new RepositoryConfiguration(); + for (int i = 1; i <= partitions; i++) { + repoConfig.addStorageDirectory(String.valueOf(i), new File("target/storage/testIndexDirectoryManager/" + UUID.randomUUID() + "/" + i)); + } + return repoConfig; + } +}