Repository: nifi Updated Branches: refs/heads/master 8d467f3d1 -> 96ed405d7
http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestLuceneEventIndex.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestLuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestLuceneEventIndex.java new file mode 100644 index 0000000..c892376 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestLuceneEventIndex.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.index.lucene; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; +import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode; +import org.apache.nifi.provenance.lucene.IndexManager; +import org.apache.nifi.provenance.lucene.SimpleIndexManager; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QueryResult; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.search.SearchTerms; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.provenance.store.ArrayListEventStore; +import org.apache.nifi.provenance.store.EventStore; +import org.apache.nifi.provenance.store.StorageResult; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestLuceneEventIndex { + + private final AtomicLong idGenerator = new AtomicLong(0L); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void setLogger() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG"); + } + + + @Test(timeout = 5000) + public void testGetMinimumIdToReindex() throws InterruptedException { + final RepositoryConfiguration repoConfig = createConfig(1); + repoConfig.setDesiredIndexSize(1L); + final IndexManager indexManager = new SimpleIndexManager(repoConfig); + + final ArrayListEventStore eventStore = new ArrayListEventStore(); + final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 20_000, EventReporter.NO_OP); + index.initialize(eventStore); + + for (int i = 0; i < 50_000; i++) { + final ProvenanceEventRecord event = createEvent("1234"); + final StorageResult storageResult = eventStore.addEvent(event); + index.addEvents(storageResult.getStorageLocations()); + } + + while (index.getMaxEventId("1") < 40_000L) { + Thread.sleep(25); + } + + final long id = index.getMinimumEventIdToReindex("1"); + assertTrue(id >= 30000L); + } + + @Test(timeout = 5000) + public void testUnauthorizedEventsGetPlaceholdersForLineage() throws InterruptedException { + final RepositoryConfiguration repoConfig = createConfig(1); + repoConfig.setDesiredIndexSize(1L); + final IndexManager indexManager = new SimpleIndexManager(repoConfig); + + final ArrayListEventStore eventStore = new ArrayListEventStore(); + final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP); + index.initialize(eventStore); + + for (int i = 0; i < 3; i++) { + final ProvenanceEventRecord event = createEvent("1234"); + final StorageResult storageResult = eventStore.addEvent(event); + index.addEvents(storageResult.getStorageLocations()); + } + + final NiFiUser user = createUser(); + + List<LineageNode> nodes = Collections.emptyList(); + while (nodes.size() < 3) { + final ComputeLineageSubmission submission = index.submitLineageComputation(1L, user, EventAuthorizer.DENY_ALL); + assertTrue(submission.getResult().awaitCompletion(5, TimeUnit.SECONDS)); + + nodes = submission.getResult().getNodes(); + Thread.sleep(25L); + } + + assertEquals(3, nodes.size()); + + for (final LineageNode node : nodes) { + assertEquals(LineageNodeType.PROVENANCE_EVENT_NODE, node.getNodeType()); + final ProvenanceEventLineageNode eventNode = (ProvenanceEventLineageNode) node; + assertEquals(ProvenanceEventType.UNKNOWN, eventNode.getEventType()); + } + } + + @Test(timeout = 5000) + public void testUnauthorizedEventsGetPlaceholdersForExpandChildren() throws InterruptedException { + final RepositoryConfiguration repoConfig = createConfig(1); + repoConfig.setDesiredIndexSize(1L); + final IndexManager indexManager = new SimpleIndexManager(repoConfig); + + final ArrayListEventStore eventStore = new ArrayListEventStore(); + final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP); + index.initialize(eventStore); + + final ProvenanceEventRecord firstEvent = createEvent("4444"); + + final Map<String, String> previousAttributes = new HashMap<>(); + previousAttributes.put("uuid", "4444"); + final Map<String, String> updatedAttributes = new HashMap<>(); + updatedAttributes.put("updated", "true"); + final ProvenanceEventRecord fork = new StandardProvenanceEventRecord.Builder() + .setEventType(ProvenanceEventType.FORK) + .setAttributes(previousAttributes, updatedAttributes) + .addChildFlowFile("1234") + .setComponentId("component-1") + .setComponentType("unit test") + .setEventId(idGenerator.getAndIncrement()) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(System.currentTimeMillis()) + .setFlowFileUUID("4444") + .setLineageStartDate(System.currentTimeMillis()) + .setCurrentContentClaim("container", "section", "unit-test-id", 0L, 1024L) + .build(); + + index.addEvents(eventStore.addEvent(firstEvent).getStorageLocations()); + index.addEvents(eventStore.addEvent(fork).getStorageLocations()); + + for (int i = 0; i < 3; i++) { + final ProvenanceEventRecord event = createEvent("1234"); + final StorageResult storageResult = eventStore.addEvent(event); + index.addEvents(storageResult.getStorageLocations()); + } + + final NiFiUser user = createUser(); + + final EventAuthorizer allowForkEvents = new EventAuthorizer() { + @Override + public boolean isAuthorized(ProvenanceEventRecord event) { + return event.getEventType() == ProvenanceEventType.FORK; + } + + @Override + public void authorize(ProvenanceEventRecord event) throws AccessDeniedException { + } + }; + + List<LineageNode> nodes = Collections.emptyList(); + while (nodes.size() < 5) { + final ComputeLineageSubmission submission = index.submitExpandChildren(1L, user, allowForkEvents); + assertTrue(submission.getResult().awaitCompletion(5, TimeUnit.SECONDS)); + + nodes = submission.getResult().getNodes(); + Thread.sleep(25L); + } + + assertEquals(5, nodes.size()); + + assertEquals(1L, nodes.stream().filter(n -> n.getNodeType() == LineageNodeType.FLOWFILE_NODE).count()); + assertEquals(4L, nodes.stream().filter(n -> n.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE).count()); + + final Map<ProvenanceEventType, List<LineageNode>> eventMap = nodes.stream() + .filter(n -> n.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE) + .collect(Collectors.groupingBy(n -> ((ProvenanceEventLineageNode) n).getEventType())); + + assertEquals(2, eventMap.size()); + assertEquals(1, eventMap.get(ProvenanceEventType.FORK).size()); + assertEquals(3, eventMap.get(ProvenanceEventType.UNKNOWN).size()); + } + + @Test(timeout = 5000) + public void testUnauthorizedEventsGetPlaceholdersForFindParents() throws InterruptedException { + final RepositoryConfiguration repoConfig = createConfig(1); + repoConfig.setDesiredIndexSize(1L); + final IndexManager indexManager = new SimpleIndexManager(repoConfig); + + final ArrayListEventStore eventStore = new ArrayListEventStore(); + final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP); + index.initialize(eventStore); + + final ProvenanceEventRecord firstEvent = createEvent("4444"); + + final Map<String, String> previousAttributes = new HashMap<>(); + previousAttributes.put("uuid", "4444"); + final Map<String, String> updatedAttributes = new HashMap<>(); + updatedAttributes.put("updated", "true"); + final ProvenanceEventRecord join = new StandardProvenanceEventRecord.Builder() + .setEventType(ProvenanceEventType.JOIN) + .setAttributes(previousAttributes, updatedAttributes) + .addParentUuid("4444") + .addChildFlowFile("1234") + .setComponentId("component-1") + .setComponentType("unit test") + .setEventId(idGenerator.getAndIncrement()) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(System.currentTimeMillis()) + .setFlowFileUUID("1234") + .setLineageStartDate(System.currentTimeMillis()) + .setCurrentContentClaim("container", "section", "unit-test-id", 0L, 1024L) + .build(); + + index.addEvents(eventStore.addEvent(firstEvent).getStorageLocations()); + index.addEvents(eventStore.addEvent(join).getStorageLocations()); + + for (int i = 0; i < 3; i++) { + final ProvenanceEventRecord event = createEvent("1234"); + final StorageResult storageResult = eventStore.addEvent(event); + index.addEvents(storageResult.getStorageLocations()); + } + + final NiFiUser user = createUser(); + + final EventAuthorizer allowJoinEvents = new EventAuthorizer() { + @Override + public boolean isAuthorized(ProvenanceEventRecord event) { + return event.getEventType() == ProvenanceEventType.JOIN; + } + + @Override + public void authorize(ProvenanceEventRecord event) throws AccessDeniedException { + } + }; + + List<LineageNode> nodes = Collections.emptyList(); + while (nodes.size() < 2) { + final ComputeLineageSubmission submission = index.submitExpandParents(1L, user, allowJoinEvents); + assertTrue(submission.getResult().awaitCompletion(5, TimeUnit.SECONDS)); + + nodes = submission.getResult().getNodes(); + Thread.sleep(25L); + } + + assertEquals(2, nodes.size()); + + final Map<ProvenanceEventType, List<LineageNode>> eventMap = nodes.stream() + .filter(n -> n.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE) + .collect(Collectors.groupingBy(n -> ((ProvenanceEventLineageNode) n).getEventType())); + + assertEquals(2, eventMap.size()); + assertEquals(1, eventMap.get(ProvenanceEventType.JOIN).size()); + assertEquals(1, eventMap.get(ProvenanceEventType.UNKNOWN).size()); + + assertEquals("4444", eventMap.get(ProvenanceEventType.UNKNOWN).get(0).getFlowFileUuid()); + } + + @Test(timeout = 5000) + public void testUnauthorizedEventsGetFilteredForQuery() throws InterruptedException { + final RepositoryConfiguration repoConfig = createConfig(1); + repoConfig.setDesiredIndexSize(1L); + final IndexManager indexManager = new SimpleIndexManager(repoConfig); + + final ArrayListEventStore eventStore = new ArrayListEventStore(); + final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP); + index.initialize(eventStore); + + for (int i = 0; i < 3; i++) { + final ProvenanceEventRecord event = createEvent("1234"); + final StorageResult storageResult = eventStore.addEvent(event); + index.addEvents(storageResult.getStorageLocations()); + } + + final Query query = new Query(UUID.randomUUID().toString()); + final EventAuthorizer authorizer = new EventAuthorizer() { + @Override + public boolean isAuthorized(ProvenanceEventRecord event) { + return event.getEventId() % 2 == 0; + } + + @Override + public void authorize(ProvenanceEventRecord event) throws AccessDeniedException { + throw new AccessDeniedException(); + } + }; + + List<ProvenanceEventRecord> events = Collections.emptyList(); + while (events.size() < 2) { + final QuerySubmission submission = index.submitQuery(query, authorizer, "unit test"); + assertTrue(submission.getResult().awaitCompletion(5, TimeUnit.SECONDS)); + events = submission.getResult().getMatchingEvents(); + Thread.sleep(25L); + } + + assertEquals(2, events.size()); + } + + + private NiFiUser createUser() { + return new NiFiUser() { + @Override + public String getIdentity() { + return "unit test"; + } + + @Override + public NiFiUser getChain() { + return null; + } + + @Override + public boolean isAnonymous() { + return false; + } + + @Override + public String getClientAddress() { + return "127.0.0.1"; + } + }; + } + + + @Test(timeout = 5000) + public void testExpiration() throws InterruptedException, IOException { + final RepositoryConfiguration repoConfig = createConfig(1); + repoConfig.setDesiredIndexSize(1L); + final IndexManager indexManager = new SimpleIndexManager(repoConfig); + + final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 1, EventReporter.NO_OP); + + final List<ProvenanceEventRecord> events = new ArrayList<>(); + events.add(createEvent(500000L)); + events.add(createEvent()); + + final EventStore eventStore = Mockito.mock(EventStore.class); + Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() { + @Override + public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable { + final Long eventId = invocation.getArgumentAt(0, Long.class); + assertEquals(0, eventId.longValue()); + assertEquals(1, invocation.getArgumentAt(1, Integer.class).intValue()); + return Collections.singletonList(events.get(0)); + } + }).when(eventStore).getEvents(Mockito.anyLong(), Mockito.anyInt()); + + index.initialize(eventStore); + index.addEvent(events.get(0), createStorageSummary(events.get(0).getEventId())); + + // Add the first event to the index and wait for it to be indexed, since indexing is asynchronous. + List<File> allDirectories = Collections.emptyList(); + while (allDirectories.isEmpty()) { + allDirectories = index.getDirectoryManager().getDirectories(null, null); + } + + events.remove(0); // Remove the first event from the store + index.performMaintenance(); + assertEquals(1, index.getDirectoryManager().getDirectories(null, null).size()); + } + + private StorageSummary createStorageSummary(final long eventId) { + return new StorageSummary(eventId, "1.prov", "1", 1, 2L, 2L); + } + + + @Test(timeout = 5000) + public void addThenQueryWithEmptyQuery() throws InterruptedException { + final RepositoryConfiguration repoConfig = createConfig(); + final IndexManager indexManager = new SimpleIndexManager(repoConfig); + + final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 1, EventReporter.NO_OP); + + final ProvenanceEventRecord event = createEvent(); + + index.addEvent(event, new StorageSummary(event.getEventId(), "1.prov", "1", 1, 2L, 2L)); + + final Query query = new Query(UUID.randomUUID().toString()); + + final ArrayListEventStore eventStore = new ArrayListEventStore(); + eventStore.addEvent(event); + index.initialize(eventStore); + + // We don't know how long it will take for the event to be indexed, so keep querying until + // we get a result. The test will timeout after 5 seconds if we've still not succeeded. + List<ProvenanceEventRecord> matchingEvents = Collections.emptyList(); + while (matchingEvents.isEmpty()) { + final QuerySubmission submission = index.submitQuery(query, EventAuthorizer.GRANT_ALL, "unit test user"); + assertNotNull(submission); + + final QueryResult result = submission.getResult(); + assertNotNull(result); + result.awaitCompletion(100, TimeUnit.MILLISECONDS); + + assertTrue(result.isFinished()); + assertNull(result.getError()); + + matchingEvents = result.getMatchingEvents(); + assertNotNull(matchingEvents); + Thread.sleep(100L); // avoid crushing the CPU + } + + assertEquals(1, matchingEvents.size()); + assertEquals(event, matchingEvents.get(0)); + } + + @Test(timeout = 50000) + public void testQuerySpecificField() throws InterruptedException { + final RepositoryConfiguration repoConfig = createConfig(); + final IndexManager indexManager = new SimpleIndexManager(repoConfig); + + final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 2, EventReporter.NO_OP); + + // add 2 events, one of which we will query for. + final ProvenanceEventRecord event = createEvent(); + index.addEvent(event, new StorageSummary(event.getEventId(), "1.prov", "1", 1, 2L, 2L)); + index.addEvent(createEvent(), new StorageSummary(2L, "1.prov", "1", 1, 2L, 2L)); + + // Create a query that searches for the event with the FlowFile UUID equal to the first event's. + final Query query = new Query(UUID.randomUUID().toString()); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, event.getFlowFileUuid())); + + final ArrayListEventStore eventStore = new ArrayListEventStore(); + eventStore.addEvent(event); + index.initialize(eventStore); + + // We don't know how long it will take for the event to be indexed, so keep querying until + // we get a result. The test will timeout after 5 seconds if we've still not succeeded. + List<ProvenanceEventRecord> matchingEvents = Collections.emptyList(); + while (matchingEvents.isEmpty()) { + final QuerySubmission submission = index.submitQuery(query, EventAuthorizer.GRANT_ALL, "unit test user"); + assertNotNull(submission); + + final QueryResult result = submission.getResult(); + assertNotNull(result); + result.awaitCompletion(100, TimeUnit.MILLISECONDS); + + assertTrue(result.isFinished()); + assertNull(result.getError()); + + matchingEvents = result.getMatchingEvents(); + assertNotNull(matchingEvents); + Thread.sleep(100L); // avoid crushing the CPU + } + + assertEquals(1, matchingEvents.size()); + assertEquals(event, matchingEvents.get(0)); + } + + private RepositoryConfiguration createConfig() { + return createConfig(1); + } + + private RepositoryConfiguration createConfig(final int storageDirectoryCount) { + final RepositoryConfiguration config = new RepositoryConfiguration(); + final String unitTestName = testName.getMethodName(); + final File storageDir = new File("target/storage/" + unitTestName + "/" + UUID.randomUUID().toString()); + + for (int i = 0; i < storageDirectoryCount; i++) { + config.addStorageDirectory(String.valueOf(i + 1), new File(storageDir, String.valueOf(i))); + } + + config.setSearchableFields(Collections.singletonList(SearchableFields.FlowFileUUID)); + config.setSearchableAttributes(Collections.singletonList(SearchableFields.newSearchableAttribute("updated"))); + + for (final File file : config.getStorageDirectories().values()) { + assertTrue(file.exists() || file.mkdirs()); + } + + return config; + } + + private ProvenanceEventRecord createEvent() { + return createEvent(System.currentTimeMillis()); + } + + private ProvenanceEventRecord createEvent(final String uuid) { + return createEvent(System.currentTimeMillis(), uuid); + } + + private ProvenanceEventRecord createEvent(final long timestamp) { + return createEvent(timestamp, UUID.randomUUID().toString()); + } + + private ProvenanceEventRecord createEvent(final long timestamp, final String uuid) { + final Map<String, String> previousAttributes = new HashMap<>(); + previousAttributes.put("uuid", uuid); + final Map<String, String> updatedAttributes = new HashMap<>(); + updatedAttributes.put("updated", "true"); + + final ProvenanceEventRecord event = new StandardProvenanceEventRecord.Builder() + .setEventType(ProvenanceEventType.CONTENT_MODIFIED) + .setAttributes(previousAttributes, updatedAttributes) + .setComponentId("component-1") + .setComponentType("unit test") + .setEventId(idGenerator.getAndIncrement()) + .setEventTime(timestamp) + .setFlowFileEntryDate(timestamp) + .setFlowFileUUID(uuid) + .setLineageStartDate(timestamp) + .setCurrentContentClaim("container", "section", "unit-test-id", 0L, 1024L) + .build(); + + return event; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java index 36f0b00..a42b73a 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java @@ -29,14 +29,14 @@ import java.util.UUID; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.StringField; -import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.index.EventIndexWriter; import org.apache.nifi.util.file.FileUtils; import org.junit.After; import org.junit.Before; @@ -67,47 +67,47 @@ public class TestCachingIndexManager { public void test() throws IOException { // Create and IndexWriter and add a document to the index, then close the writer. // This gives us something that we can query. - final IndexWriter writer = manager.borrowIndexWriter(indexDir); + final EventIndexWriter writer = manager.borrowIndexWriter(indexDir); final Document doc = new Document(); doc.add(new StringField("unit test", "true", Store.YES)); - writer.addDocument(doc); - manager.returnIndexWriter(indexDir, writer); + writer.index(doc, 1000); + manager.returnIndexWriter(writer); // Get an Index Searcher that we can use to query the index. - final IndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir); + final EventIndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir); // Ensure that we get the expected results. assertCount(cachedSearcher, 1); // While we already have an Index Searcher, get a writer for the same index. // This will cause the Index Searcher to be marked as poisoned. - final IndexWriter writer2 = manager.borrowIndexWriter(indexDir); + final EventIndexWriter writer2 = manager.borrowIndexWriter(indexDir); // Obtain a new Index Searcher with the writer open. This Index Searcher should *NOT* // be the same as the previous searcher because the new one will be a Near-Real-Time Index Searcher // while the other is not. - final IndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir); + final EventIndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir); assertNotSame(cachedSearcher, nrtSearcher); // Ensure that we get the expected query results. assertCount(nrtSearcher, 1); // Return the writer, so that there is no longer an active writer for the index. - manager.returnIndexWriter(indexDir, writer2); + manager.returnIndexWriter(writer2); // Ensure that we still get the same result. assertCount(cachedSearcher, 1); - manager.returnIndexSearcher(indexDir, cachedSearcher); + manager.returnIndexSearcher(cachedSearcher); // Ensure that our near-real-time index searcher still gets the same result. assertCount(nrtSearcher, 1); - manager.returnIndexSearcher(indexDir, nrtSearcher); + manager.returnIndexSearcher(nrtSearcher); } - private void assertCount(final IndexSearcher searcher, final int count) throws IOException { + private void assertCount(final EventIndexSearcher searcher, final int count) throws IOException { final BooleanQuery query = new BooleanQuery(); query.add(new BooleanClause(new TermQuery(new Term("unit test", "true")), Occur.MUST)); - final TopDocs topDocs = searcher.search(query, count * 10); + final TopDocs topDocs = searcher.getIndexSearcher().search(query, count * 10); assertNotNull(topDocs); assertEquals(1, topDocs.totalHits); } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java index 834177f..05369ca 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java @@ -18,18 +18,21 @@ package org.apache.nifi.provenance.lucene; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.StringField; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.index.EventIndexSearcher; +import org.apache.nifi.provenance.index.EventIndexWriter; import org.apache.nifi.util.file.FileUtils; import org.junit.BeforeClass; import org.junit.Test; @@ -40,14 +43,13 @@ public class TestSimpleIndexManager { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); } - @Test public void testMultipleWritersSimultaneouslySameIndex() throws IOException { - final SimpleIndexManager mgr = new SimpleIndexManager(); + final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration()); final File dir = new File("target/" + UUID.randomUUID().toString()); try { - final IndexWriter writer1 = mgr.borrowIndexWriter(dir); - final IndexWriter writer2 = mgr.borrowIndexWriter(dir); + final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir); + final EventIndexWriter writer2 = mgr.borrowIndexWriter(dir); final Document doc1 = new Document(); doc1.add(new StringField("id", "1", Store.YES)); @@ -55,18 +57,94 @@ public class TestSimpleIndexManager { final Document doc2 = new Document(); doc2.add(new StringField("id", "2", Store.YES)); - writer1.addDocument(doc1); - writer2.addDocument(doc2); - mgr.returnIndexWriter(dir, writer2); - mgr.returnIndexWriter(dir, writer1); + writer1.index(doc1, 1000); + writer2.index(doc2, 1000); + mgr.returnIndexWriter(writer2); + mgr.returnIndexWriter(writer1); - final IndexSearcher searcher = mgr.borrowIndexSearcher(dir); - final TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 2); + final EventIndexSearcher searcher = mgr.borrowIndexSearcher(dir); + final TopDocs topDocs = searcher.getIndexSearcher().search(new MatchAllDocsQuery(), 2); assertEquals(2, topDocs.totalHits); - mgr.returnIndexSearcher(dir, searcher); + mgr.returnIndexSearcher(searcher); } finally { FileUtils.deleteFile(dir, true); } } + @Test + public void testWriterCloseIfPreviouslyMarkedCloseable() throws IOException { + final AtomicInteger closeCount = new AtomicInteger(0); + + final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration()) { + @Override + protected void close(IndexWriterCount count) throws IOException { + closeCount.incrementAndGet(); + } + }; + + final File dir = new File("target/" + UUID.randomUUID().toString()); + + final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir); + final EventIndexWriter writer2 = mgr.borrowIndexWriter(dir); + assertTrue(writer1 == writer2); + + mgr.returnIndexWriter(writer1, true, true); + assertEquals(0, closeCount.get()); + + final EventIndexWriter[] writers = new EventIndexWriter[10]; + for (int i = 0; i < writers.length; i++) { + writers[i] = mgr.borrowIndexWriter(dir); + assertTrue(writers[i] == writer1); + } + + for (int i = 0; i < writers.length; i++) { + mgr.returnIndexWriter(writers[i], true, false); + assertEquals(0, closeCount.get()); + assertEquals(1, mgr.getWriterCount()); + } + + // this should close the index writer even though 'false' is passed in + // because the previous call marked the writer as closeable and this is + // the last reference to the writer. + mgr.returnIndexWriter(writer2, false, false); + assertEquals(1, closeCount.get()); + assertEquals(0, mgr.getWriterCount()); + } + + @Test + public void testWriterCloseIfOnlyUser() throws IOException { + final AtomicInteger closeCount = new AtomicInteger(0); + + final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration()) { + @Override + protected void close(IndexWriterCount count) throws IOException { + closeCount.incrementAndGet(); + } + }; + + final File dir = new File("target/" + UUID.randomUUID().toString()); + + final EventIndexWriter writer = mgr.borrowIndexWriter(dir); + mgr.returnIndexWriter(writer, true, true); + assertEquals(1, closeCount.get()); + } + + @Test + public void testWriterLeftOpenIfNotCloseable() throws IOException { + final AtomicInteger closeCount = new AtomicInteger(0); + + final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration()) { + @Override + protected void close(IndexWriterCount count) throws IOException { + closeCount.incrementAndGet(); + } + }; + + final File dir = new File("target/" + UUID.randomUUID().toString()); + + final EventIndexWriter writer = mgr.borrowIndexWriter(dir); + mgr.returnIndexWriter(writer, true, false); + assertEquals(0, closeCount.get()); + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/ArrayListEventStore.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/ArrayListEventStore.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/ArrayListEventStore.java new file mode 100644 index 0000000..94a3699 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/ArrayListEventStore.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.authorization.EventTransformer; +import org.apache.nifi.provenance.index.EventIndex; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ArrayListEventStore implements EventStore { + private static final Logger logger = LoggerFactory.getLogger(ArrayListEventStore.class); + + private final List<ProvenanceEventRecord> events = new ArrayList<>(); + private final AtomicLong idGenerator = new AtomicLong(0L); + + @Override + public void close() throws IOException { + } + + @Override + public void initialize() throws IOException { + } + + public StorageResult addEvent(final ProvenanceEventRecord event) { + return addEvents(Collections.singleton(event)); + } + + @Override + public synchronized StorageResult addEvents(Iterable<ProvenanceEventRecord> events) { + final Map<ProvenanceEventRecord, StorageSummary> storageLocations = new HashMap<>(); + + for (final ProvenanceEventRecord event : events) { + this.events.add(event); + + final StorageSummary storageSummary = new StorageSummary(idGenerator.getAndIncrement(), "location", "1", 1, 0L, 0L); + storageLocations.put(event, storageSummary); + } + + return new StorageResult() { + @Override + public Map<ProvenanceEventRecord, StorageSummary> getStorageLocations() { + return storageLocations; + } + + @Override + public boolean triggeredRollover() { + return false; + } + + @Override + public Integer getEventsRolledOver() { + return null; + } + }; + } + + @Override + public long getSize() throws IOException { + return 0; + } + + @Override + public long getMaxEventId() { + return idGenerator.get() - 1; + } + + @Override + public synchronized Optional<ProvenanceEventRecord> getEvent(long id) throws IOException { + if (events.size() <= id) { + return Optional.empty(); + } + + return Optional.ofNullable(events.get((int) id)); + } + + @Override + public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxResults) throws IOException { + return getEvents(firstRecordId, maxResults, EventAuthorizer.GRANT_ALL, EventTransformer.EMPTY_TRANSFORMER); + } + + @Override + public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxResults, EventAuthorizer authorizer, EventTransformer transformer) throws IOException { + final List<ProvenanceEventRecord> events = new ArrayList<>(); + for (int i = 0; i < maxResults; i++) { + final Optional<ProvenanceEventRecord> eventOption = getEvent(firstRecordId + i); + if (!eventOption.isPresent()) { + break; + } + + events.add(eventOption.get()); + } + + return events; + } + + @Override + public List<ProvenanceEventRecord> getEvents(final List<Long> eventIds, final EventAuthorizer authorizer, final EventTransformer transformer) { + final List<ProvenanceEventRecord> events = new ArrayList<>(); + for (final Long eventId : eventIds) { + final Optional<ProvenanceEventRecord> eventOption; + try { + eventOption = getEvent(eventId); + } catch (final Exception e) { + logger.warn("Failed to retrieve event with ID " + eventId, e); + continue; + } + + if (!eventOption.isPresent()) { + continue; + } + + if (authorizer.isAuthorized(eventOption.get())) { + events.add(eventOption.get()); + } else { + final Optional<ProvenanceEventRecord> transformedOption = transformer.transform(eventOption.get()); + if (transformedOption.isPresent()) { + events.add(transformedOption.get()); + } + } + } + + return events; + } + + @Override + public void reindexLatestEvents(EventIndex eventIndex) { + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestEventFileManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestEventFileManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestEventFileManager.java new file mode 100644 index 0000000..42b8be2 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestEventFileManager.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +public class TestEventFileManager { + + @Test(timeout = 5000) + public void testTwoWriteLocks() throws InterruptedException { + final EventFileManager fileManager = new EventFileManager(); + final File f1 = new File("1.prov"); + final File gz = new File("1.prov.gz"); + + final AtomicBoolean obtained = new AtomicBoolean(false); + + final Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + fileManager.obtainWriteLock(f1); + + synchronized (obtained) { + obtained.set(true); + obtained.notify(); + } + + try { + Thread.sleep(500L); + } catch (InterruptedException e) { + } + fileManager.releaseWriteLock(f1); + } + }); + + t1.start(); + + final Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + synchronized (obtained) { + while (!obtained.get()) { + try { + obtained.wait(); + } catch (InterruptedException e) { + } + } + } + + fileManager.obtainWriteLock(gz); + fileManager.releaseWriteLock(gz); + } + }); + + final long start = System.nanoTime(); + t2.start(); + t2.join(); + final long nanos = System.nanoTime() - start; + assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(300L)); + } + + + @Test(timeout = 5000) + public void testTwoReadLocks() throws InterruptedException { + final EventFileManager fileManager = new EventFileManager(); + final File f1 = new File("1.prov"); + final File gz = new File("1.prov.gz"); + + final AtomicBoolean obtained = new AtomicBoolean(false); + + final Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + fileManager.obtainReadLock(f1); + + synchronized (obtained) { + obtained.set(true); + obtained.notify(); + } + + try { + Thread.sleep(100000L); + } catch (InterruptedException e) { + } + fileManager.releaseReadLock(f1); + } + }); + + t1.start(); + + final Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + synchronized (obtained) { + while (!obtained.get()) { + try { + obtained.wait(); + } catch (InterruptedException e) { + } + } + } + + fileManager.obtainReadLock(gz); + fileManager.releaseReadLock(gz); + } + }); + + final long start = System.nanoTime(); + t2.start(); + t2.join(); + final long nanos = System.nanoTime() - start; + assertTrue(nanos < TimeUnit.MILLISECONDS.toNanos(500L)); + } + + + @Test(timeout = 5000) + public void testWriteThenRead() throws InterruptedException { + final EventFileManager fileManager = new EventFileManager(); + final File f1 = new File("1.prov"); + final File gz = new File("1.prov.gz"); + + final AtomicBoolean obtained = new AtomicBoolean(false); + + final Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + fileManager.obtainWriteLock(f1); + + synchronized (obtained) { + obtained.set(true); + obtained.notify(); + } + + try { + Thread.sleep(500L); + } catch (InterruptedException e) { + } + fileManager.releaseWriteLock(f1); + } + }); + + t1.start(); + + final Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + synchronized (obtained) { + while (!obtained.get()) { + try { + obtained.wait(); + } catch (InterruptedException e) { + } + } + } + + fileManager.obtainReadLock(gz); + fileManager.releaseReadLock(gz); + } + }); + + final long start = System.nanoTime(); + t2.start(); + t2.join(); + final long nanos = System.nanoTime() - start; + assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(300L)); + } + + + @Test(timeout = 5000) + public void testReadThenWrite() throws InterruptedException { + final EventFileManager fileManager = new EventFileManager(); + final File f1 = new File("1.prov"); + final File gz = new File("1.prov.gz"); + + final AtomicBoolean obtained = new AtomicBoolean(false); + + final Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + fileManager.obtainReadLock(f1); + + synchronized (obtained) { + obtained.set(true); + obtained.notify(); + } + + try { + Thread.sleep(500L); + } catch (InterruptedException e) { + } + fileManager.releaseReadLock(f1); + } + }); + + t1.start(); + + final Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + synchronized (obtained) { + while (!obtained.get()) { + try { + obtained.wait(); + } catch (InterruptedException e) { + } + } + } + + fileManager.obtainWriteLock(gz); + fileManager.releaseWriteLock(gz); + } + }); + + final long start = System.nanoTime(); + t2.start(); + t2.join(); + final long nanos = System.nanoTime() - start; + assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(300L)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java new file mode 100644 index 0000000..7c5e43b --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter; +import org.apache.nifi.provenance.IdentifierLookup; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.authorization.EventAuthorizer; +import org.apache.nifi.provenance.authorization.EventTransformer; +import org.apache.nifi.provenance.serialization.RecordReaders; +import org.apache.nifi.provenance.serialization.RecordWriters; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestPartitionedWriteAheadEventStore { + private static final RecordWriterFactory writerFactory = (file, idGen, compress, createToc) -> RecordWriters.newSchemaRecordWriter(file, idGen, compress, createToc); + private static final RecordReaderFactory readerFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars); + + private final AtomicLong idGenerator = new AtomicLong(0L); + + @Rule + public TestName testName = new TestName(); + + @Before + public void resetIds() { + idGenerator.set(0L); + } + + + @Test + @Ignore + public void testPerformanceOfAccessingEvents() throws Exception { + final RecordWriterFactory recordWriterFactory = (file, idGenerator, compressed, createToc) -> { + final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null; + return new EventIdFirstSchemaRecordWriter(file, idGenerator, tocWriter, compressed, 1024 * 1024, IdentifierLookup.EMPTY); + }; + + final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(createConfig(), + recordWriterFactory, recordReaderFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + assertEquals(-1, store.getMaxEventId()); + for (int i = 0; i < 100_000; i++) { + final ProvenanceEventRecord event1 = createEvent(); + store.addEvents(Collections.singleton(event1)); + } + + final List<Long> eventIdList = Arrays.asList(4L, 80L, 1024L, 40_000L, 80_000L, 99_000L); + + while (true) { + for (int i = 0; i < 100; i++) { + time(() -> store.getEvents(eventIdList, EventAuthorizer.GRANT_ALL, EventTransformer.EMPTY_TRANSFORMER), "Fetch Events"); + } + + Thread.sleep(1000L); + } + } + + private void time(final Callable<?> task, final String taskDescription) throws Exception { + final long start = System.nanoTime(); + task.call(); + final long nanos = System.nanoTime() - start; + final long millis = TimeUnit.NANOSECONDS.toMillis(nanos); + System.out.println("Took " + millis + " ms to " + taskDescription); + } + + @Test + public void testSingleWriteThenRead() throws IOException { + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(createConfig(), writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + assertEquals(-1, store.getMaxEventId()); + final ProvenanceEventRecord event1 = createEvent(); + final StorageResult result = store.addEvents(Collections.singleton(event1)); + + final StorageSummary summary = result.getStorageLocations().values().iterator().next(); + final long eventId = summary.getEventId(); + final ProvenanceEventRecord eventWithId = addId(event1, eventId); + + assertEquals(0, store.getMaxEventId()); + + final ProvenanceEventRecord read = store.getEvent(eventId).get(); + assertEquals(eventWithId, read); + } + + @Test + public void testMultipleWritesThenReads() throws IOException { + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(createConfig(), writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + assertEquals(-1, store.getMaxEventId()); + + final int numEvents = 20; + final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents); + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + assertEquals(i, store.getMaxEventId()); + + events.add(event); + } + + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord read = store.getEvent(i).get(); + assertEquals(events.get(i), read); + } + } + + + @Test() + public void testMultipleWritesThenGetAllInSingleRead() throws IOException { + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(createConfig(), writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + assertEquals(-1, store.getMaxEventId()); + + final int numEvents = 20; + final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents); + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + assertEquals(i, store.getMaxEventId()); + + events.add(event); + } + + List<ProvenanceEventRecord> eventsRead = store.getEvents(0L, numEvents, null, EventTransformer.EMPTY_TRANSFORMER); + assertNotNull(eventsRead); + + assertEquals(numEvents, eventsRead.size()); + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord read = eventsRead.get(i); + assertEquals(events.get(i), read); + } + + eventsRead = store.getEvents(-1000, 1000, null, EventTransformer.EMPTY_TRANSFORMER); + assertNotNull(eventsRead); + assertTrue(eventsRead.isEmpty()); + + eventsRead = store.getEvents(10, 0, null, EventTransformer.EMPTY_TRANSFORMER); + assertNotNull(eventsRead); + assertTrue(eventsRead.isEmpty()); + + eventsRead = store.getEvents(10, 1, null, EventTransformer.EMPTY_TRANSFORMER); + assertNotNull(eventsRead); + assertFalse(eventsRead.isEmpty()); + assertEquals(1, eventsRead.size()); + assertEquals(events.get(10), eventsRead.get(0)); + + eventsRead = store.getEvents(20, 1000, null, EventTransformer.EMPTY_TRANSFORMER); + assertNotNull(eventsRead); + assertTrue(eventsRead.isEmpty()); + } + + @Test + public void testGetSize() throws IOException { + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(createConfig(), writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + long storeSize = 0L; + final int numEvents = 20; + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + final long newSize = store.getSize(); + assertTrue(newSize > storeSize); + storeSize = newSize; + } + } + + @Test + public void testMaxEventIdRestored() throws IOException { + final RepositoryConfiguration config = createConfig(); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + final int numEvents = 20; + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + } + + assertEquals(19, store.getMaxEventId()); + store.close(); + + final PartitionedWriteAheadEventStore recoveredStore = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + recoveredStore.initialize(); + assertEquals(19, recoveredStore.getMaxEventId()); + } + + @Test + public void testGetEvent() throws IOException { + final RepositoryConfiguration config = createConfig(); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + final int numEvents = 20; + final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents); + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + events.add(event); + } + + // Ensure that each event is retrieved successfully. + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = store.getEvent(i).get(); + assertEquals(events.get(i), event); + } + + assertFalse(store.getEvent(-1L).isPresent()); + assertFalse(store.getEvent(20L).isPresent()); + } + + @Test + public void testGetEventsWithMinIdAndCount() throws IOException { + final RepositoryConfiguration config = createConfig(); + config.setMaxEventFileCount(100); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + final int numEvents = 50_000; + final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents); + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + if (i < 1000) { + events.add(event); + } + } + + assertTrue(store.getEvents(-1000L, 1000).isEmpty()); + assertEquals(events, store.getEvents(0, events.size())); + assertEquals(events, store.getEvents(-30, events.size())); + assertEquals(events.subList(10, events.size()), store.getEvents(10L, events.size() - 10)); + assertTrue(store.getEvents(numEvents, 100).isEmpty()); + } + + @Test + public void testGetEventsWithMinIdAndCountWithAuthorizer() throws IOException { + final RepositoryConfiguration config = createConfig(); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + final int numEvents = 20; + final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents); + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + events.add(event); + } + + final EventAuthorizer allowEventNumberedEventIds = new EventAuthorizer() { + @Override + public boolean isAuthorized(final ProvenanceEventRecord event) { + return event.getEventId() % 2 == 0L; + } + + @Override + public void authorize(ProvenanceEventRecord event) throws AccessDeniedException { + if (!isAuthorized(event)) { + throw new AccessDeniedException(); + } + } + }; + + final List<ProvenanceEventRecord> storedEvents = store.getEvents(0, 20, allowEventNumberedEventIds, EventTransformer.EMPTY_TRANSFORMER); + assertEquals(numEvents / 2, storedEvents.size()); + for (int i = 0; i < storedEvents.size(); i++) { + assertEquals(events.get(i * 2), storedEvents.get(i)); + } + } + + + @Test + public void testGetEventsWithStartOffsetAndCountWithNothingAuthorized() throws IOException { + final RepositoryConfiguration config = createConfig(); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + final int numEvents = 20; + final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents); + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + events.add(event); + } + + final EventAuthorizer allowEventNumberedEventIds = EventAuthorizer.DENY_ALL; + final List<ProvenanceEventRecord> storedEvents = store.getEvents(0, 20, allowEventNumberedEventIds, EventTransformer.EMPTY_TRANSFORMER); + assertTrue(storedEvents.isEmpty()); + } + + @Test + public void testGetSpecificEventIds() throws IOException { + final RepositoryConfiguration config = createConfig(); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + final int numEvents = 20; + final List<ProvenanceEventRecord> events = new ArrayList<>(numEvents); + for (int i = 0; i < numEvents; i++) { + final ProvenanceEventRecord event = createEvent(); + store.addEvents(Collections.singleton(event)); + events.add(event); + } + + final EventAuthorizer allowEvenNumberedEventIds = new EventAuthorizer() { + @Override + public boolean isAuthorized(final ProvenanceEventRecord event) { + return event.getEventId() % 2 == 0L; + } + + @Override + public void authorize(ProvenanceEventRecord event) throws AccessDeniedException { + if (!isAuthorized(event)) { + throw new AccessDeniedException(); + } + } + }; + + final List<Long> evenEventIds = new ArrayList<>(); + final List<Long> oddEventIds = new ArrayList<>(); + final List<Long> allEventIds = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + final Long id = Long.valueOf(i); + allEventIds.add(id); + + if (i % 2 == 0) { + evenEventIds.add(id); + } else { + oddEventIds.add(id); + } + } + + final List<ProvenanceEventRecord> storedEvents = store.getEvents(evenEventIds, allowEvenNumberedEventIds, EventTransformer.EMPTY_TRANSFORMER); + assertEquals(numEvents / 2, storedEvents.size()); + for (int i = 0; i < storedEvents.size(); i++) { + assertEquals(events.get(i * 2), storedEvents.get(i)); + } + + assertTrue(store.getEvents(oddEventIds, allowEvenNumberedEventIds, EventTransformer.EMPTY_TRANSFORMER).isEmpty()); + + final List<ProvenanceEventRecord> allStoredEvents = store.getEvents(allEventIds, EventAuthorizer.GRANT_ALL, EventTransformer.EMPTY_TRANSFORMER); + assertEquals(events, allStoredEvents); + } + + + @Test + public void testWriteAfterRecoveringRepo() throws IOException { + final RepositoryConfiguration config = createConfig(); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + for (int i = 0; i < 4; i++) { + store.addEvents(Collections.singleton(createEvent())); + } + + store.close(); + + final PartitionedWriteAheadEventStore recoveredStore = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + recoveredStore.initialize(); + + List<ProvenanceEventRecord> recoveredEvents = recoveredStore.getEvents(0, 10); + assertEquals(4, recoveredEvents.size()); + + // ensure that we can still write to the store + for (int i = 0; i < 4; i++) { + recoveredStore.addEvents(Collections.singleton(createEvent())); + } + + recoveredEvents = recoveredStore.getEvents(0, 10); + assertEquals(8, recoveredEvents.size()); + + for (int i = 0; i < 8; i++) { + assertEquals(i, recoveredEvents.get(i).getEventId()); + } + } + + + private RepositoryConfiguration createConfig() { + return createConfig(2); + } + + private RepositoryConfiguration createConfig(final int numStorageDirs) { + final RepositoryConfiguration config = new RepositoryConfiguration(); + final String unitTestName = testName.getMethodName(); + final File storageDir = new File("target/storage/" + unitTestName + "/" + UUID.randomUUID().toString()); + + for (int i = 1; i <= numStorageDirs; i++) { + config.addStorageDirectory(String.valueOf(i), new File(storageDir, String.valueOf(i))); + } + + return config; + } + + private ProvenanceEventRecord addId(final ProvenanceEventRecord event, final long eventId) { + return new StandardProvenanceEventRecord.Builder() + .fromEvent(event) + .setEventId(eventId) + .build(); + } + + + private ProvenanceEventRecord createEvent() { + final String uuid = UUID.randomUUID().toString(); + final Map<String, String> previousAttributes = new HashMap<>(); + previousAttributes.put("uuid", uuid); + final Map<String, String> updatedAttributes = new HashMap<>(); + updatedAttributes.put("updated", "true"); + + return new StandardProvenanceEventRecord.Builder() + .setEventType(ProvenanceEventType.CONTENT_MODIFIED) + .setAttributes(previousAttributes, updatedAttributes) + .setComponentId("component-1") + .setComponentType("unit test") + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(System.currentTimeMillis()) + .setFlowFileUUID(uuid) + .setLineageStartDate(System.currentTimeMillis()) + .setCurrentContentClaim("container", "section", "unit-test-id", 0L, 1024L) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java new file mode 100644 index 0000000..3879411 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter; +import org.apache.nifi.provenance.IdentifierLookup; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.RepositoryConfiguration; +import org.apache.nifi.provenance.TestUtil; +import org.apache.nifi.provenance.index.EventIndex; +import org.apache.nifi.provenance.serialization.RecordReaders; +import org.apache.nifi.provenance.serialization.StorageSummary; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestWriteAheadStorePartition { + + @Test + @SuppressWarnings("unchecked") + public void testReindex() throws IOException { + final RepositoryConfiguration repoConfig = createConfig(1, "testReindex"); + repoConfig.setMaxEventFileCount(5); + + final String partitionName = repoConfig.getStorageDirectories().keySet().iterator().next(); + final File storageDirectory = repoConfig.getStorageDirectories().values().iterator().next(); + + final RecordWriterFactory recordWriterFactory = (file, idGenerator, compressed, createToc) -> { + final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null; + return new EventIdFirstSchemaRecordWriter(file, idGenerator, tocWriter, compressed, 32 * 1024, IdentifierLookup.EMPTY); + }; + + final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars); + + final WriteAheadStorePartition partition = new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory, + recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP); + + for (int i = 0; i < 100; i++) { + partition.addEvents(Collections.singleton(TestUtil.createEvent())); + } + + final Map<ProvenanceEventRecord, StorageSummary> reindexedEvents = new HashMap<>(); + final EventIndex eventIndex = Mockito.mock(EventIndex.class); + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final Map<ProvenanceEventRecord, StorageSummary> events = invocation.getArgumentAt(0, Map.class); + reindexedEvents.putAll(events); + return null; + } + }).when(eventIndex).reindexEvents(Mockito.anyMap()); + + Mockito.doReturn(18L).when(eventIndex).getMinimumEventIdToReindex("1"); + partition.reindexLatestEvents(eventIndex); + + final List<Long> eventIdsReindexed = reindexedEvents.values().stream() + .map(StorageSummary::getEventId) + .sorted() + .collect(Collectors.toList()); + + assertEquals(82, eventIdsReindexed.size()); + for (int i = 0; i < eventIdsReindexed.size(); i++) { + assertEquals(18 + i, eventIdsReindexed.get(i).intValue()); + } + } + + private RepositoryConfiguration createConfig(final int numStorageDirs, final String testName) { + final RepositoryConfiguration config = new RepositoryConfiguration(); + final File storageDir = new File("target/storage/" + testName + "/" + UUID.randomUUID().toString()); + + for (int i = 1; i <= numStorageDirs; i++) { + config.addStorageDirectory(String.valueOf(1), new File(storageDir, String.valueOf(i))); + } + + config.setJournalCount(4); + return config; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java new file mode 100644 index 0000000..0089f61 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/iterator/TestSelectiveRecordReaderEventIterator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.store.iterator; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter; +import org.apache.nifi.provenance.IdentifierLookup; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.TestUtil; +import org.apache.nifi.provenance.serialization.RecordReaders; +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.provenance.store.RecordReaderFactory; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + + +public class TestSelectiveRecordReaderEventIterator { + + + private RecordWriter createWriter(final File file, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException { + return new EventIdFirstSchemaRecordWriter(file, new AtomicLong(0L), tocWriter, compressed, uncompressedBlockSize, IdentifierLookup.EMPTY); + } + + @Test + public void testFilterUnneededFiles() { + final File file1 = new File("1.prov"); + final File file1000 = new File("1000.prov"); + final File file2000 = new File("2000.prov"); + final File file3000 = new File("3000.prov"); + + // Filter out the first file. + final List<File> files = new ArrayList<>(); + files.add(file1); + files.add(file1000); + files.add(file2000); + files.add(file3000); + + List<Long> eventIds = new ArrayList<>(); + eventIds.add(1048L); + eventIds.add(2048L); + eventIds.add(3048L); + + List<File> filteredFiles = SelectiveRecordReaderEventIterator.filterUnneededFiles(files, eventIds); + assertEquals(Arrays.asList(new File[] {file1000, file2000, file3000}), filteredFiles); + + // Filter out file at end + eventIds.clear(); + eventIds.add(1L); + eventIds.add(1048L); + + filteredFiles = SelectiveRecordReaderEventIterator.filterUnneededFiles(files, eventIds); + assertEquals(Arrays.asList(new File[] {file1, file1000}), filteredFiles); + } + + @Test + @Ignore("For local testing only. Runs indefinitely") + public void testPerformanceOfRandomAccessReads() throws Exception { + final File dir = new File("target/storage/" + UUID.randomUUID().toString()); + final File journalFile = new File(dir, "/4.prov.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + + final int blockSize = 1024 * 32; + try (final RecordWriter writer = createWriter(journalFile, new StandardTocWriter(tocFile, true, false), true, blockSize)) { + writer.writeHeader(0L); + + for (int i = 0; i < 100_000; i++) { + writer.writeRecord(TestUtil.createEvent()); + } + } + + final Long[] eventIds = new Long[] { + 4L, 80L, 1024L, 1025L, 1026L, 1027L, 1028L, 1029L, 1030L, 40_000L, 80_000L, 99_000L + }; + + final RecordReaderFactory readerFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars); + + final List<File> files = new ArrayList<>(); + files.add(new File(dir, "0.prov")); + files.add(new File(dir, "0.prov")); + files.add(new File(dir, "1.prov")); + files.add(new File(dir, "2.prov")); + files.add(new File(dir, "3.prov")); + files.add(journalFile); + files.add(new File(dir, "100000000.prov")); + + boolean loopForever = true; + while (loopForever) { + final long start = System.nanoTime(); + for (int i = 0; i < 1000; i++) { + final SelectiveRecordReaderEventIterator iterator = new SelectiveRecordReaderEventIterator( + Collections.singletonList(journalFile), readerFactory, Arrays.asList(eventIds), 32 * 1024); + + for (final long id : eventIds) { + time(() -> { + return iterator.nextEvent().orElse(null); + }, id); + } + } + + final long ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + System.out.println(ms + " ms total"); + } + } + + private void time(final Callable<ProvenanceEventRecord> task, final long id) throws Exception { + final long start = System.nanoTime(); + final ProvenanceEventRecord event = task.call(); + Assert.assertNotNull(event); + Assert.assertEquals(id, event.getEventId()); + // System.out.println(event); + final long nanos = System.nanoTime() - start; + final long millis = TimeUnit.NANOSECONDS.toMillis(nanos); + // System.out.println("Took " + millis + " ms to " + taskDescription); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index e467676..f08fed4 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -124,7 +124,8 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { } @Override - public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws IOException { + public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory, + final IdentifierLookup idLookup) throws IOException { if (initialized.getAndSet(true)) { return; } @@ -542,7 +543,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { if (event == null) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList()); + submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L); return submission; } @@ -573,7 +574,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { if (event == null) { final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId); lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); - submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList()); + submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L); return submission; } @@ -681,7 +682,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository { @Override public void run() { final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(filter); - submission.getResult().update(records); + submission.getResult().update(records, records.size()); } }