http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java b/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java new file mode 100644 index 0000000..b6012db --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java @@ -0,0 +1,348 @@ +package org.apache.nifi.pql; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.VolatileProvenanceRepository; +import org.apache.nifi.provenance.query.ProvenanceResultSet; +import org.apache.nifi.util.NiFiProperties; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestQuery { + + private ProvenanceEventRepository repo; + + @Before + public void setup() { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties"); + repo = new VolatileProvenanceRepository(); + } + + + private void createRecords() throws IOException { + final Map<String, String> previousAttributes = new HashMap<>(); + previousAttributes.put("filename", "xyz"); + + final Map<String, String> updatedAttributes = new HashMap<>(); + updatedAttributes.put("filename", "xyz.txt"); + updatedAttributes.put("mime.type", "text/plain"); + + final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder(); + recordBuilder.setAttributes(previousAttributes, Collections.<String, String>emptyMap()) + .setComponentId("000") + .setComponentType("MyComponent") + .setEventType(ProvenanceEventType.RECEIVE) + .setFlowFileEntryDate(System.currentTimeMillis()) + .setFlowFileUUID("1234") + .setTransitUri("https://localhost:80/nifi"); + + + recordBuilder.setCurrentContentClaim("container", "section", "1", 0L, 100L); + repo.registerEvent(recordBuilder.build()); + + + recordBuilder.setAttributes(previousAttributes, updatedAttributes); + recordBuilder.setCurrentContentClaim("container", "section", "2", 0L, 1024 * 1024L); + repo.registerEvent(recordBuilder.build()); + } + + + private void createRecords(final int records, final ProvenanceEventType type, final long sleep) throws IOException { + final Map<String, String> previousAttributes = new HashMap<>(); + previousAttributes.put("filename", "xyz"); + + final Map<String, String> updatedAttributes = new HashMap<>(); + updatedAttributes.put("filename", "xyz.txt"); + updatedAttributes.put("mime.type", "text/plain"); + + final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder(); + recordBuilder.setAttributes(previousAttributes, Collections.<String, String>emptyMap()) + .setComponentType("MyComponent") + .setEventType(type) + .setFlowFileEntryDate(System.currentTimeMillis()) + .setFlowFileUUID("1234") + .setTransitUri("https://localhost:80/nifi"); + + final long now = System.currentTimeMillis(); + for (int i=0; i < records; i++) { + recordBuilder.setCurrentContentClaim("container", "section", String.valueOf(i), 0L, 100L); + final Map<String, String> attr = new HashMap<>(updatedAttributes); + attr.put("i", String.valueOf(i)); + recordBuilder.setAttributes(previousAttributes, attr); + recordBuilder.setFlowFileEntryDate(System.currentTimeMillis()); + recordBuilder.setEventTime(now + (i * sleep)); + recordBuilder.setComponentId(UUID.randomUUID().toString()); + + repo.registerEvent(recordBuilder.build()); + } + } + + @Test + public void testCompilationManually() { + System.out.println(ProvenanceQuery.compile("SELECT R.TransitUri FROM *")); + System.out.println(ProvenanceQuery.compile("SELECT R['filename'] FROM RECEIVE, SEND;")); + System.out.println(ProvenanceQuery.compile("SELECT Event FROM RECEIVE ORDER BY Event['filename'];")); + +// System.out.println(Query.compile("SELECT Event FROM RECEIVE WHERE ((Event.TransitUri <> 'http') OR (Event['filename'] = '1.txt')) and (Event.Size > 1000 or Event.Size between 1 AND 4);")); + + System.out.println(ProvenanceQuery.compile("SELECT SUM(Event.size) FROM RECEIVE")); + } + + + @Test + public void testSumAverage() throws IOException { + createRecords(); + dump(ProvenanceQuery.execute("SELECT Event", repo)); + + final ProvenanceQuery query = ProvenanceQuery.compile("SELECT SUM(Event.Size), AVG(Event.Size) FROM RECEIVE WHERE Event.TransitUri = 'https://localhost:80/nifi'"); + + final ProvenanceResultSet rs = query.execute(repo); + dump(rs); + + dump(ProvenanceQuery.execute("SELECT Event.TransitUri", repo)); + dump(ProvenanceQuery.execute("SELECT Event['mime.type'], Event['filename']", repo)); + dump(ProvenanceQuery.execute("SELECT Event['filename'], SUM(Event.size) GROUP BY Event['filename']", repo)); + } + + + @Test + public void testGroupBy() throws IOException { + createRecords(200000, ProvenanceEventType.RECEIVE, 0L); + createRecords(2, ProvenanceEventType.SEND, 0L); + + ProvenanceResultSet rs = ProvenanceQuery.execute("SELECT Event['filename'], COUNT(Event), Event.Type GROUP BY Event['filename'], Event.Type", repo); + dump(rs); + + rs = ProvenanceQuery.execute("SELECT Event['filename'], COUNT(Event), Event.Type GROUP BY Event['filename'], Event.Type", repo); + + int receiveRows = 0; + int sendRows = 0; + while (rs.hasNext()) { + final List<?> cols = rs.next(); + final ProvenanceEventType type = (ProvenanceEventType) cols.get(2); + if ( type == ProvenanceEventType.RECEIVE ) { + receiveRows++; + assertEquals("xyz.txt", cols.get(0)); + assertEquals(200000L, cols.get(1)); + } else if ( type == ProvenanceEventType.SEND ) { + sendRows++; + assertEquals("xyz.txt", cols.get(0)); + assertEquals(2L, cols.get(1)); + } else { + Assert.fail("Event type was " + type); + } + } + + assertEquals(1, receiveRows); + assertEquals(1, sendRows); + } + + + @Test + public void testAverageGroupBy() throws IOException { + createRecords(200000, ProvenanceEventType.RECEIVE, 1L); + createRecords(5000, ProvenanceEventType.SEND, 1L); + + dump(ProvenanceQuery.execute("SELECT AVG(Event.Size), Event.Type GROUP BY SECOND(Event.Time), Event.Type", repo)); + } + + + @Test + public void testSelectSeveralRecords() throws IOException { + createRecords(2000, ProvenanceEventType.SEND, 1L); + createRecords(200, ProvenanceEventType.RECEIVE, 1L); + dump(ProvenanceQuery.execute( + "SELECT SECOND(Event.Time), Event.Type, SUM(Event.Size), COUNT(Event) " + + "FROM SEND, RECEIVE " + + "GROUP BY SECOND(Event.Time), Event.Type" + , repo)); + } + + @Test + public void testNot() throws IOException { + createRecords(2000, ProvenanceEventType.SEND, 0L); + createRecords(200, ProvenanceEventType.RECEIVE, 0L); + + dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) WHERE NOT(Event.Type = 'SEND')", repo)); + dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) WHERE NOT(Event.Type = 'RECEIVE')", repo)); + dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) WHERE NOT(NOT( Event.Type = 'SEND'))", repo)); + } + + + @Test + public void testOrderByField() throws IOException { + createRecords(2000, ProvenanceEventType.SEND, 1L); + + dump(ProvenanceQuery.execute("SELECT Event.Time, Event.ComponentId ORDER BY Event.ComponentId LIMIT 15", repo)); + dump(ProvenanceQuery.execute("SELECT Event.Time, Event.ComponentId ORDER BY Event.Time DESC LIMIT 15", repo)); + } + + + @Test + public void testOrderByGroupedField() throws IOException { + createRecords(2, ProvenanceEventType.SEND, 0L); + createRecords(5, ProvenanceEventType.RECEIVE, 0L); + + dump(ProvenanceQuery.execute("SELECT Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) DESC", repo)); + + ProvenanceResultSet rs = ProvenanceQuery.execute("SELECT Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) DESC", repo); + + assertTrue( rs.hasNext() ); + List<?> values = rs.next(); + assertEquals("RECEIVE", values.get(0).toString()); + assertEquals(500L, values.get(1)); + + assertTrue( rs.hasNext() ); + values = rs.next(); + assertEquals("SEND", values.get(0).toString()); + assertEquals(200L, values.get(1)); + + assertFalse( rs.hasNext() ); + + + rs = ProvenanceQuery.execute("SELECT Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) ASC", repo); + + assertTrue( rs.hasNext() ); + values = rs.next(); + assertEquals("SEND", values.get(0).toString()); + assertEquals(200L, values.get(1)); + + assertTrue( rs.hasNext() ); + values = rs.next(); + assertEquals("RECEIVE", values.get(0).toString()); + assertEquals(500L, values.get(1)); + + assertFalse( rs.hasNext() ); + } + + + @Test + public void testOrderByFieldAndGroupedValue() throws IOException { + createRecords(3, ProvenanceEventType.SEND, 0L); + createRecords(5, ProvenanceEventType.RECEIVE, 0L); + createRecords(3, ProvenanceEventType.ATTRIBUTES_MODIFIED, 0L); + + final String query = "SELECT Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) DESC, Event.Type"; + dump(ProvenanceQuery.execute(query, repo)); + + ProvenanceResultSet rs = ProvenanceQuery.execute(query, repo); + + assertTrue( rs.hasNext() ); + List<?> vals = rs.next(); + assertEquals(2, vals.size()); + assertEquals("RECEIVE", vals.get(0).toString()); + assertEquals(500L, vals.get(1)); + + assertTrue( rs.hasNext() ); + vals = rs.next(); + assertEquals(2, vals.size()); + assertEquals("ATTRIBUTES_MODIFIED", vals.get(0).toString()); + assertEquals(300L, vals.get(1)); + + assertTrue( rs.hasNext() ); + vals = rs.next(); + assertEquals(2, vals.size()); + assertEquals("SEND", vals.get(0).toString()); + assertEquals(300L, vals.get(1)); + + assertFalse( rs.hasNext() ); + } + + + @Test + public void testAndsOrs() throws IOException { + + final Map<String, String> previousAttributes = new HashMap<>(); + previousAttributes.put("filename", "xyz"); + + final Map<String, String> updatedAttributes = new HashMap<>(); + updatedAttributes.put("filename", "xyz.txt"); + updatedAttributes.put("mime.type", "text/plain"); + updatedAttributes.put("abc", "cba"); + updatedAttributes.put("123", "321"); + + final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder(); + recordBuilder.setAttributes(previousAttributes, Collections.<String, String>emptyMap()) + .setComponentId("000") + .setComponentType("MyComponent") + .setEventType(ProvenanceEventType.SEND) + .setFlowFileEntryDate(System.currentTimeMillis()) + .setFlowFileUUID("1234") + .setCurrentContentClaim("container", "section", "1", 0L, 100L) + .setAttributes(previousAttributes, updatedAttributes) + .setTransitUri("https://localhost:80/nifi"); + + repo.registerEvent(recordBuilder.build()); + + final String queryString = "SELECT Event " + + "WHERE " + + "( " + + " Event['filename'] = 'xyz.txt' " + + " OR " + + " Event['mime.type'] = 'ss' " + + ") " + + "AND " + + "( " + + " Event['abc'] = 'cba' " + + " OR " + + " Event['123'] = '123' " + + ")"; + System.out.println(queryString); + + final ProvenanceQuery query = ProvenanceQuery.compile(queryString); + + System.out.println(query.getWhereClause()); + + ProvenanceResultSet rs = query.execute(repo); + assertTrue(rs.hasNext()); + rs.next(); + assertFalse(rs.hasNext()); + + + + updatedAttributes.put("filename", "xxyz"); + repo = new VolatileProvenanceRepository(); + recordBuilder.setAttributes(previousAttributes, updatedAttributes); + repo.registerEvent(recordBuilder.build()); + + rs = query.execute(repo); + assertFalse(rs.hasNext()); + + + + updatedAttributes.put("filename", "xyz.txt"); + updatedAttributes.put("123", "123"); + repo = new VolatileProvenanceRepository(); + recordBuilder.setAttributes(previousAttributes, updatedAttributes); + repo.registerEvent(recordBuilder.build()); + + rs = query.execute(repo); + assertTrue(rs.hasNext()); + rs.next(); + assertFalse(rs.hasNext()); + + } + + private void dump(final ProvenanceResultSet rs) { + System.out.println(rs.getLabels()); + while (rs.hasNext()) { + System.out.println(rs.next()); + } + + System.out.println("\n\n\n"); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/test/resources/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/test/resources/nifi.properties b/nifi/nifi-commons/nifi-provenance-query-language/src/test/resources/nifi.properties new file mode 100644 index 0000000..9d0683d --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/test/resources/nifi.properties @@ -0,0 +1,136 @@ +# Core Properties # +nifi.version=nifi-0.0.1 +nifi.flow.configuration.file=./conf/flow.xml.gz +nifi.flow.configuration.archive.dir=./conf/archive/ +nifi.flowcontroller.autoResumeState=false +nifi.flowcontroller.graceful.shutdown.period=10 sec +nifi.flowservice.writedelay.interval=2 sec +nifi.administrative.yield.duration=30 sec + +nifi.authority.provider.configuration.file=./conf/authority-providers.xml +nifi.reporting.task.configuration.file=./conf/reporting-tasks.xml +nifi.controller.service.configuration.file=./conf/controller-services.xml +nifi.templates.directory=./conf/templates +nifi.ui.banner.text= +nifi.ui.autorefresh.interval=30 sec +nifi.nar.library.directory=./lib +nifi.nar.working.directory=./work/nar/ +nifi.documentation.working.directory=./work/docs/components + +# H2 Settings +nifi.database.directory=./database_repository +nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE + +# FlowFile Repository +nifi.flowfile.repository.directory=./flowfile_repository +nifi.flowfile.repository.partitions=256 +nifi.flowfile.repository.checkpoint.interval=2 mins +nifi.queue.swap.threshold=20000 +nifi.swap.storage.directory=./flowfile_repository/swap +nifi.swap.in.period=5 sec +nifi.swap.in.threads=1 +nifi.swap.out.period=5 sec +nifi.swap.out.threads=4 + +# Content Repository +nifi.content.claim.max.appendable.size=10 MB +nifi.content.claim.max.flow.files=100 +nifi.content.repository.directory.default=./content_repository + + +# VOlatile prov properties +nifi.provenance.repository.buffer.size=250000 + + +# Provenance Repository Properties +nifi.provenance.repository.implementation=nifi.provenance.PersistentProvenanceRepository +nifi.provenance.repository.directory.default=./provenance_repository +nifi.provenance.repository.max.storage.time=24 hours +nifi.provenance.repository.max.storage.size=1 GB +nifi.provenance.repository.rollover.time=5 mins +nifi.provenance.repository.rollover.size=100 MB +nifi.provenance.repository.query.threads=2 +nifi.provenance.repository.compress.on.rollover=true +# Comma-separated list of fields. Fields that are not indexed will not be searchable. Valid fields are: +# EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, AlternateIdentifierURI, ContentType, Relationship, Details +nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename, ProcessorID +# FlowFile Attributes that should be indexed and made searchable +nifi.provenance.repository.indexed.attributes= +# Large values for the shard size will result in more Java heap usage when searching the Provenance Repository +# but should provide better performance +nifi.provenance.repository.index.shard.size=500 MB + +# Component Status Repository +nifi.components.status.repository.implementation=nifi.controller.status.history.VolatileComponentStatusRepository +nifi.components.status.repository.buffer.size=288 +nifi.components.status.snapshot.frequency=5 mins + +# Site to Site properties +nifi.remote.input.socket.port= +nifi.remote.input.secure=true + +# web properties # +nifi.web.war.directory=./lib +nifi.web.http.interfaces=eth6 +nifi.web.http.host= +nifi.web.http.port=8080 +nifi.web.https.interfaces= +nifi.web.https.host= +nifi.web.https.port= +nifi.web.jetty.working.directory=./work/jetty + +# security properties # +nifi.sensitive.props.key=password +nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL +nifi.sensitive.props.provider=BC + +nifi.security.keystore= +nifi.security.keystoreType= +nifi.security.keystorePasswd= +nifi.security.keyPasswd= +nifi.security.truststore= +nifi.security.truststoreType= +nifi.security.truststorePasswd= +nifi.security.needClientAuth= +nifi.security.user.credential.cache.duration=24 hours +nifi.security.user.authority.provider=file-provider +nifi.security.support.new.account.requests= +nifi.security.ocsp.responder.url= +nifi.security.ocsp.responder.certificate= + +# cluster common properties (cluster manager and nodes must have same values) # +nifi.cluster.protocol.heartbeat.interval=5 sec +nifi.cluster.protocol.is.secure=false +nifi.cluster.protocol.socket.timeout=30 sec +nifi.cluster.protocol.connection.handshake.timeout=45 sec +# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties must be configured # +nifi.cluster.protocol.use.multicast=false +nifi.cluster.protocol.multicast.address= +nifi.cluster.protocol.multicast.port= +nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms +nifi.cluster.protocol.multicast.service.locator.attempts=3 +nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec + +# cluster node properties (only configure for cluster nodes) # +nifi.cluster.is.node=false +nifi.cluster.node.address= +nifi.cluster.node.interfaces= +nifi.cluster.node.protocol.port= +nifi.cluster.node.protocol.threads=2 +# if multicast is not used, nifi.cluster.node.unicast.xxx must have same values as nifi.cluster.manager.xxx # +nifi.cluster.node.unicast.manager.address= +nifi.cluster.node.unicast.manager.protocol.port= + +# cluster manager properties (only configure for cluster manager) # +nifi.cluster.is.manager=false +nifi.cluster.manager.address= +nifi.cluster.manager.interfaces=eth6 +nifi.cluster.manager.protocol.port= +nifi.cluster.manager.node.firewall.file= +nifi.cluster.manager.node.event.history.size=10 +nifi.cluster.manager.node.api.connection.timeout=30 sec +nifi.cluster.manager.node.api.read.timeout=30 sec +nifi.cluster.manager.node.api.request.threads=10 +nifi.cluster.manager.flow.retrieval.delay=5 sec +nifi.cluster.manager.protocol.threads=10 +nifi.cluster.manager.safemode.duration=0 sec http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml index 4e9e9fb..3d5163e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml @@ -25,6 +25,15 @@ <artifactId>nifi-properties</artifactId> </dependency> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-provenance-query-language</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </dependency> + <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java index ffa9676..2643787 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java @@ -25,7 +25,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.pql.ProvenanceQuery; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.ProvenanceEventBuilder; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -52,6 +55,7 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.StorageLocation; import org.apache.nifi.provenance.StoredProvenanceEvent; import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException; import org.apache.nifi.provenance.journaling.index.EventIndexSearcher; import org.apache.nifi.provenance.journaling.index.IndexAction; import org.apache.nifi.provenance.journaling.index.IndexManager; @@ -69,6 +73,9 @@ import org.apache.nifi.provenance.journaling.query.StandardQueryManager; import org.apache.nifi.provenance.journaling.toc.StandardTocReader; import org.apache.nifi.provenance.journaling.toc.TocReader; import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.query.ProvenanceQueryResult; +import org.apache.nifi.provenance.query.ProvenanceQuerySubmission; +import org.apache.nifi.provenance.query.ProvenanceResultSet; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QuerySubmission; import org.apache.nifi.provenance.search.SearchableField; @@ -78,7 +85,7 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +// TODO: read-only is not checked everywhere! public class JournalingProvenanceRepository implements ProvenanceEventRepository { public static final String WORKER_THREAD_POOL_SIZE = "nifi.provenance.repository.worker.threads"; public static final String BLOCK_SIZE = "nifi.provenance.repository.writer.block.size"; @@ -90,7 +97,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository // the follow member variables are effectively final. They are initialized // in the initialize method rather than the constructor because we want to ensure - // that they only not created every time that the Java Service Loader instantiates the class. + // that they are not created every time that the Java Service Loader instantiates the class. private ScheduledExecutorService workerExecutor; private ExecutorService queryExecutor; private ExecutorService compressionExecutor; @@ -226,7 +233,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository final int compressionThreads = Math.max(1, config.getCompressionThreadPoolSize()); this.compressionExecutor = Executors.newFixedThreadPool(compressionThreads, createThreadFactory("Provenance Repository Compression Thread")); - this.indexManager = new LuceneIndexManager(config, workerExecutor, queryExecutor); + this.indexManager = new LuceneIndexManager(this, config, workerExecutor, queryExecutor); this.partitionManager = new QueuingPartitionManager(indexManager, idGenerator, config, workerExecutor, compressionExecutor); this.queryManager = new StandardQueryManager(indexManager, queryExecutor, config, 10); @@ -436,6 +443,154 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository return maxId; } + ProgressAwareIterator<? extends StoredProvenanceEvent> selectMatchingEvents(final String query, final AtomicLong lastTimeProgressMade) throws IOException { + final Set<EventIndexSearcher> searchers = indexManager.getSearchers(); + final Iterator<EventIndexSearcher> searchItr = searchers.iterator(); + + return new ProgressAwareIterator<StoredProvenanceEvent>() { + private Iterator<LazyInitializedProvenanceEvent> eventItr; + private int searchersComplete = 0; + private EventIndexSearcher currentSearcher; + + @Override + public int getPercentComplete() { + return searchers.isEmpty() ? 100 : searchersComplete / searchers.size() * 100; + } + + @Override + public boolean hasNext() { + // while the event iterator has no information... + while ( eventItr == null || !eventItr.hasNext() ) { + // if there's not another searcher then we're out of events. + if ( !searchItr.hasNext() ) { + return false; + } + + // we're finished with this searcher. Close it. + if ( currentSearcher != null ) { + try { + currentSearcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", currentSearcher, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + // We have a searcher. get events from it. If there are no matches, + // then our while loop will keep going. + currentSearcher = searchItr.next(); + searchersComplete++; + + try { + eventItr = currentSearcher.select(query); + } catch (final IOException ioe) { + throw new EventNotFoundException("Could not find next event", ioe); + } + } + + // the event iterator has no events, and the search iterator has no more + // searchers. There are no more events. + return eventItr != null && eventItr.hasNext(); + } + + @Override + public StoredProvenanceEvent next() { + lastTimeProgressMade.set(System.nanoTime()); + return eventItr.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + public ProvenanceResultSet query(final String query) throws IOException { + final ProvenanceQuerySubmission submission = submitQuery(query); + return submission.getResult().getResultSet(); + } + + + public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final String queryIdentifier) { + return queryManager.retrieveProvenanceQuerySubmission(queryIdentifier); + } + + public ProvenanceQuerySubmission submitQuery(final String query) { + ProvenanceQuerySubmission submission; + final AtomicLong lastTimeProgressMade = new AtomicLong(System.nanoTime()); + final long tenMinsInNanos = TimeUnit.MINUTES.toNanos(10); + + try { + final ProgressAwareIterator<? extends StoredProvenanceEvent> eventItr = selectMatchingEvents(query, lastTimeProgressMade); + final ProvenanceResultSet rs = ProvenanceQuery.compile(query).evaluate(eventItr); + + submission = new JournalingRepoQuerySubmission(query, new ProvenanceQueryResult() { + @Override + public ProvenanceResultSet getResultSet() { + return rs; + } + + @Override + public Date getExpiration() { + return new Date(tenMinsInNanos + lastTimeProgressMade.get()); + } + + @Override + public String getError() { + return null; + } + + @Override + public int getPercentComplete() { + return eventItr.getPercentComplete(); + } + + @Override + public boolean isFinished() { + return eventItr.getPercentComplete() >= 100; + } + }); + } catch (final IOException ioe) { + logger.error("Failed to perform query {} due to {}", query, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", ioe); + } + + submission = new JournalingRepoQuerySubmission(query, new ProvenanceQueryResult() { + @Override + public ProvenanceResultSet getResultSet() { + return null; + } + + @Override + public Date getExpiration() { + return new Date(tenMinsInNanos + lastTimeProgressMade.get()); + } + + @Override + public String getError() { + return "Failed to perform query due to " + ioe; + } + + @Override + public int getPercentComplete() { + return 0; + } + + @Override + public boolean isFinished() { + return true; + } + }); + } + + queryManager.registerSubmission(submission); + return submission; + } + @Override public QuerySubmission submitQuery(final Query query) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingRepoQuerySubmission.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingRepoQuerySubmission.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingRepoQuerySubmission.java new file mode 100644 index 0000000..f755bc6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingRepoQuerySubmission.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling; + +import java.util.Date; +import java.util.UUID; + +import org.apache.nifi.provenance.query.ProvenanceQueryResult; +import org.apache.nifi.provenance.query.ProvenanceQuerySubmission; + +public class JournalingRepoQuerySubmission implements ProvenanceQuerySubmission { + + private final Date submissionTime = new Date(); + private final String identifier = UUID.randomUUID().toString(); + private final ProvenanceQueryResult result; + + private final String query; + private final Runnable cancelCallback; + + private volatile boolean canceled = false; + + public JournalingRepoQuerySubmission(final String query, final ProvenanceQueryResult queryResult) { + this(query, queryResult, null); + } + + public JournalingRepoQuerySubmission(final String query, final ProvenanceQueryResult queryResult, final Runnable cancelCallback) { + this.query = query; + this.cancelCallback = cancelCallback; + this.result = queryResult; + } + + @Override + public String getQuery() { + return query; + } + + @Override + public ProvenanceQueryResult getResult() { + return result; + } + + @Override + public Date getSubmissionTime() { + return submissionTime; + } + + @Override + public String getQueryIdentifier() { + return identifier; + } + + @Override + public void cancel() { + this.canceled = true; + + if ( cancelCallback != null ) { + cancelCallback.run(); + } + } + + @Override + public boolean isCanceled() { + return canceled; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java new file mode 100644 index 0000000..d0562e2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexableField; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StorageLocation; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException; +import org.apache.nifi.provenance.journaling.index.IndexedFieldNames; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LazyInitializedProvenanceEvent implements StoredProvenanceEvent { + private static final Logger logger = LoggerFactory.getLogger(LazyInitializedProvenanceEvent.class); + + private final ProvenanceEventRepository repo; + private final StorageLocation storageLocation; + private final Document doc; + private ProvenanceEventRecord fullRecord; + + public LazyInitializedProvenanceEvent(final ProvenanceEventRepository repo, final StorageLocation storageLocation, final Document document) { + this.repo = repo; + this.storageLocation = storageLocation; + this.doc = document; + } + + @Override + public long getEventId() { + return doc.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue(); + } + + @Override + public StorageLocation getStorageLocation() { + return storageLocation; + } + + @Override + public long getEventTime() { + return doc.getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue(); + } + + private void ensureFullyLoaded() { + if ( fullRecord != null ) { + return; + } + + final long id = getEventId(); + try { + fullRecord = repo.getEvent(id); + } catch (final IOException ioe) { + final String containerName = doc.get(IndexedFieldNames.CONTAINER_NAME); + final String sectionName = doc.get(IndexedFieldNames.SECTION_NAME); + final String journalId = doc.get(IndexedFieldNames.JOURNAL_ID); + + final String error = "Failed to load event with ID " + id + " from container '" + containerName + "', section '" + sectionName + "', journal '" + journalId + "' due to " + ioe; + logger.error(error); + if ( logger.isDebugEnabled() ) { + logger.error("", ioe); + } + + throw new EventNotFoundException(error); + } + } + + @Override + public long getFlowFileEntryDate() { + ensureFullyLoaded(); + return fullRecord.getFlowFileEntryDate(); + } + + @Override + public long getLineageStartDate() { + final IndexableField field = doc.getField(SearchableFields.LineageStartDate.getSearchableFieldName()); + if ( field != null ) { + return field.numericValue().longValue(); + } + + ensureFullyLoaded(); + return fullRecord.getLineageStartDate(); + } + + @Override + public Set<String> getLineageIdentifiers() { + ensureFullyLoaded(); + return fullRecord.getLineageIdentifiers(); + } + + @Override + public long getFileSize() { + final IndexableField field = doc.getField(SearchableFields.FileSize.getSearchableFieldName()); + if ( field != null ) { + return field.numericValue().longValue(); + } + + ensureFullyLoaded(); + return fullRecord.getFileSize(); + } + + @Override + public Long getPreviousFileSize() { + ensureFullyLoaded(); + return fullRecord.getPreviousFileSize(); + } + + @Override + public long getEventDuration() { + // TODO: Allow Event Duration to be indexed; it could be interesting for reporting. + ensureFullyLoaded(); + return fullRecord.getEventDuration(); + } + + @Override + public ProvenanceEventType getEventType() { + final String name = doc.get(SearchableFields.EventType.getSearchableFieldName()); + return ProvenanceEventType.valueOf(name.toUpperCase()); + } + + @Override + public Map<String, String> getAttributes() { + ensureFullyLoaded(); + return fullRecord.getAttributes(); + } + + @Override + public String getAttribute(final String attributeName) { + final String attr = doc.get(attributeName); + if ( attr == null ) { + ensureFullyLoaded(); + return fullRecord.getAttribute(attributeName); + } else { + return attr; + } + } + + @Override + public Map<String, String> getPreviousAttributes() { + ensureFullyLoaded(); + return fullRecord.getPreviousAttributes(); + } + + @Override + public Map<String, String> getUpdatedAttributes() { + ensureFullyLoaded(); + return fullRecord.getUpdatedAttributes(); + } + + @Override + public String getComponentId() { + final String componentId = doc.get(SearchableFields.ComponentID.getSearchableFieldName()); + if ( componentId == null ) { + ensureFullyLoaded(); + return fullRecord.getComponentId(); + } else { + return componentId; + } + } + + @Override + public String getComponentType() { + // TODO: Make indexable. + ensureFullyLoaded(); + return fullRecord.getComponentType(); + } + + @Override + public String getTransitUri() { + final String transitUri = doc.get(SearchableFields.TransitURI.getSearchableFieldName()); + if ( transitUri == null ) { + final ProvenanceEventType eventType = getEventType(); + switch (eventType) { + case RECEIVE: + case SEND: + ensureFullyLoaded(); + return fullRecord.getTransitUri(); + default: + return null; + } + } else { + return transitUri; + } + } + + @Override + public String getSourceSystemFlowFileIdentifier() { + ensureFullyLoaded(); + return fullRecord.getSourceSystemFlowFileIdentifier(); + } + + @Override + public String getFlowFileUuid() { + return doc.get(SearchableFields.FlowFileUUID.getSearchableFieldName()); + } + + @Override + public List<String> getParentUuids() { + final IndexableField[] uuids = doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName()); + if ( uuids.length < 2 ) { + return Collections.emptyList(); + } + + switch (getEventType()) { + case JOIN: { + final List<String> parentUuids = new ArrayList<>(uuids.length - 1); + for (int i=1; i < uuids.length; i++) { + parentUuids.add(uuids[i].stringValue()); + } + return parentUuids; + } + default: + return Collections.emptyList(); + } + } + + @Override + public List<String> getChildUuids() { + final IndexableField[] uuids = doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName()); + if ( uuids.length < 2 ) { + return Collections.emptyList(); + } + + switch (getEventType()) { + case REPLAY: + case CLONE: + case FORK: { + final List<String> childUuids = new ArrayList<>(uuids.length - 1); + for (int i=1; i < uuids.length; i++) { + childUuids.add(uuids[i].stringValue()); + } + return childUuids; + } + default: + return Collections.emptyList(); + } + } + + @Override + public String getAlternateIdentifierUri() { + final String altId = doc.get(SearchableFields.AlternateIdentifierURI.getSearchableFieldName()); + if ( altId == null && getEventType() == ProvenanceEventType.ADDINFO ) { + ensureFullyLoaded(); + return fullRecord.getAlternateIdentifierUri(); + } else { + return null; + } + } + + @Override + public String getDetails() { + final String details = doc.get(SearchableFields.Details.getSearchableFieldName()); + if ( details == null ) { + ensureFullyLoaded(); + return fullRecord.getDetails(); + } + return null; + } + + @Override + public String getRelationship() { + final String relationship = doc.get(SearchableFields.Relationship.getSearchableFieldName()); + if ( relationship == null ) { + ensureFullyLoaded(); + return fullRecord.getRelationship(); + } + return null; + } + + @Override + public String getSourceQueueIdentifier() { + final String queueId = doc.get(SearchableFields.SourceQueueIdentifier.getSearchableFieldName()); + if ( queueId == null ) { + ensureFullyLoaded(); + return fullRecord.getSourceQueueIdentifier(); + } + return null; + } + + @Override + public String getContentClaimSection() { + final String claimSection = doc.get(SearchableFields.ContentClaimSection.getSearchableFieldName()); + if ( claimSection == null ) { + ensureFullyLoaded(); + return fullRecord.getContentClaimSection(); + } + return null; + } + + @Override + public String getPreviousContentClaimSection() { + ensureFullyLoaded(); + return fullRecord.getPreviousContentClaimSection(); + } + + @Override + public String getContentClaimContainer() { + final String claimContainer = doc.get(SearchableFields.ContentClaimContainer.getSearchableFieldName()); + if ( claimContainer == null ) { + ensureFullyLoaded(); + return fullRecord.getContentClaimContainer(); + } + return null; + } + + @Override + public String getPreviousContentClaimContainer() { + ensureFullyLoaded(); + return fullRecord.getPreviousContentClaimContainer(); + } + + @Override + public String getContentClaimIdentifier() { + final String claimIdentifier = doc.get(SearchableFields.ContentClaimIdentifier.getSearchableFieldName()); + if ( claimIdentifier == null ) { + ensureFullyLoaded(); + return fullRecord.getContentClaimIdentifier(); + } + return null; + } + + @Override + public String getPreviousContentClaimIdentifier() { + ensureFullyLoaded(); + return fullRecord.getPreviousContentClaimIdentifier(); + } + + @Override + public Long getContentClaimOffset() { + final String claimOffset = doc.get(SearchableFields.ContentClaimOffset.getSearchableFieldName()); + if ( claimOffset == null ) { + ensureFullyLoaded(); + return fullRecord.getContentClaimOffset(); + } + return null; + } + + @Override + public Long getPreviousContentClaimOffset() { + ensureFullyLoaded(); + return fullRecord.getPreviousContentClaimOffset(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/ProgressAwareIterator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/ProgressAwareIterator.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/ProgressAwareIterator.java new file mode 100644 index 0000000..c473224 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/ProgressAwareIterator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling; + +import java.util.Iterator; + +public interface ProgressAwareIterator<T> extends Iterator<T> { + + int getPercentComplete(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/exception/EventNotFoundException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/exception/EventNotFoundException.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/exception/EventNotFoundException.java new file mode 100644 index 0000000..398cf8e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/exception/EventNotFoundException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.exception; + +public class EventNotFoundException extends RuntimeException { + private static final long serialVersionUID = -8490783814308479930L; + + public EventNotFoundException(final String message) { + super(message); + } + + public EventNotFoundException(final String message, final Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java index 85e02c0..1761057 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java @@ -19,9 +19,11 @@ package org.apache.nifi.provenance.journaling.index; import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.Iterator; import java.util.List; import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent; import org.apache.nifi.provenance.search.Query; public interface EventIndexSearcher extends Closeable { @@ -82,4 +84,22 @@ public interface EventIndexSearcher extends Closeable { * @throws IOException */ long getNumberOfEvents() throws IOException; + + /** + * Evaluates the given query against the index, returning an iterator of lazily initialized provenance events + * + * @param query + * @throws IOException + */ + Iterator<LazyInitializedProvenanceEvent> select(String query) throws IOException; + + /** + * Evaluates the given query against the index, returning an iterator of locations from which the matching + * records can be retrieved + * + * @param query + * @return + * @throws IOException + */ + Iterator<JournaledStorageLocation> selectLocations(String query) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java index 34d1b18..9e1fc39 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java @@ -67,6 +67,24 @@ public interface IndexManager extends Closeable { <T> Set<T> withEachIndex(IndexAction<T> action) throws IOException; /** + * Executes the given action against each index and returns a Set of results, + * where each result is obtained from performing the given action against one index + * + * @param action the action to perform + * @param autoClose whether or not the EventIndexSearcher should automatically be closed + * + * @return + * @throws IOException + */ + <T> Set<T> withEachIndex(IndexAction<T> action, boolean autoClose) throws IOException; + + /** + * Returns an Iterator of EventIndexSearchers. + * @return + */ + Set<EventIndexSearcher> getSearchers() throws IOException; + + /** * Performs the given action against each index, waiting for the action to complete * against each index before returning * http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java index bf2a495..1f53d27 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; import org.apache.nifi.provenance.journaling.toc.TocJournalReader; @@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory; public class LuceneIndexManager implements IndexManager { private static final Logger logger = LoggerFactory.getLogger(LuceneIndexManager.class); + private final ProvenanceEventRepository repo; private final JournalingRepositoryConfig config; private final ExecutorService queryExecutor; @@ -50,7 +52,8 @@ public class LuceneIndexManager implements IndexManager { private final Map<String, AtomicLong> writerIndexes = new HashMap<>(); private final ConcurrentMap<String, IndexSize> indexSizes = new ConcurrentHashMap<>(); - public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService queryExecutor) throws IOException { + public LuceneIndexManager(final ProvenanceEventRepository repo, final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService queryExecutor) throws IOException { + this.repo = repo; this.config = config; this.queryExecutor = queryExecutor; @@ -66,7 +69,7 @@ public class LuceneIndexManager implements IndexManager { for ( int i=0; i < config.getIndexesPerContainer(); i++ ){ final File indexDir = new File(container, "indices/" + i); - writerList.add(new LuceneIndexWriter(indexDir, config)); + writerList.add(new LuceneIndexWriter(repo, indexDir, config)); } workerExecutor.scheduleWithFixedDelay(new Runnable() { @@ -99,7 +102,7 @@ public class LuceneIndexManager implements IndexManager { if (config.isReadOnly()) { for (int i=0; i < config.getIndexesPerContainer(); i++) { final File indexDir = new File(containerName, "indices/" + i); - searchers.add(new LuceneIndexSearcher(indexDir)); + searchers.add(new LuceneIndexSearcher(repo, indexDir)); } } else { final List<LuceneIndexWriter> writerList = writers.get(containerName); @@ -196,8 +199,39 @@ public class LuceneIndexManager implements IndexManager { } } + + @Override + public Set<EventIndexSearcher> getSearchers() throws IOException { + final Set<EventIndexSearcher> searchers = new HashSet<>(); + + try { + final Set<String> containerNames = config.getContainers().keySet(); + for (final String containerName : containerNames) { + final EventIndexSearcher searcher = newIndexSearcher(containerName); + searchers.add(searcher); + } + + return searchers; + } catch (final Exception e) { + for ( final EventIndexSearcher searcher : searchers ) { + try { + searcher.close(); + } catch (final IOException ioe) { + e.addSuppressed(ioe); + } + } + + throw e; + } + } + @Override public <T> Set<T> withEachIndex(final IndexAction<T> action) throws IOException { + return withEachIndex(action, true); + } + + @Override + public <T> Set<T> withEachIndex(final IndexAction<T> action, final boolean autoClose) throws IOException { final Set<T> results = new HashSet<>(); final Map<String, Future<T>> futures = new HashMap<>(); final Set<String> containerNames = config.getContainers().keySet(); @@ -205,8 +239,13 @@ public class LuceneIndexManager implements IndexManager { final Callable<T> callable = new Callable<T>() { @Override public T call() throws Exception { - try (final EventIndexSearcher searcher = newIndexSearcher(containerName)) { + final EventIndexSearcher searcher = newIndexSearcher(containerName); + try { return action.perform(searcher); + } finally { + if ( autoClose ) { + searcher.close(); + } } } }; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java index 94bd3f8..cd57991 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java @@ -20,7 +20,9 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; @@ -38,25 +40,38 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.store.FSDirectory; +import org.apache.nifi.pql.LuceneTranslator; +import org.apache.nifi.pql.ProvenanceQuery; +import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent; +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException; import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.util.ObjectHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LuceneIndexSearcher implements EventIndexSearcher { + private static final Logger logger = LoggerFactory.getLogger(LuceneIndexSearcher.class); + + private final ProvenanceEventRepository repo; private final DirectoryReader reader; private final IndexSearcher searcher; private final FSDirectory fsDirectory; private final String description; - public LuceneIndexSearcher(final File indexDirectory) throws IOException { + public LuceneIndexSearcher(final ProvenanceEventRepository repo, final File indexDirectory) throws IOException { + this.repo = repo; this.fsDirectory = FSDirectory.open(indexDirectory); this.reader = DirectoryReader.open(fsDirectory); this.searcher = new IndexSearcher(reader); this.description = "LuceneIndexSearcher[indexDirectory=" + indexDirectory + "]"; } - public LuceneIndexSearcher(final DirectoryReader reader, final File indexDirectory) { + public LuceneIndexSearcher(final ProvenanceEventRepository repo, final DirectoryReader reader, final File indexDirectory) { + this.repo = repo; this.reader = reader; this.searcher = new IndexSearcher(reader); this.fsDirectory = null; @@ -80,16 +95,7 @@ public class LuceneIndexSearcher implements EventIndexSearcher { throw suppressed; } } - - private JournaledStorageLocation createLocation(final Document document) { - final String containerName = document.get(IndexedFieldNames.CONTAINER_NAME); - final String sectionName = document.get(IndexedFieldNames.SECTION_NAME); - final long journalId = document.getField(IndexedFieldNames.JOURNAL_ID).numericValue().longValue(); - final int blockIndex = document.getField(IndexedFieldNames.BLOCK_INDEX).numericValue().intValue(); - final long eventId = document.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue(); - - return new JournaledStorageLocation(containerName, sectionName, journalId, blockIndex, eventId); - } + private List<JournaledStorageLocation> getOrderedLocations(final TopDocs topDocs) throws IOException { final ScoreDoc[] scoreDocs = topDocs.scoreDocs; @@ -103,7 +109,7 @@ public class LuceneIndexSearcher implements EventIndexSearcher { private void populateLocations(final TopDocs topDocs, final Collection<JournaledStorageLocation> locations) throws IOException { for ( final ScoreDoc scoreDoc : topDocs.scoreDocs ) { final Document document = reader.document(scoreDoc.doc); - locations.add(createLocation(document)); + locations.add(QueryUtils.createLocation(document)); } } @@ -188,4 +194,100 @@ public class LuceneIndexSearcher implements EventIndexSearcher { public long getNumberOfEvents() { return reader.numDocs(); } + + + private <T> Iterator<T> select(final String query, final DocumentTransformer<T> transformer) throws IOException { + final org.apache.lucene.search.Query luceneQuery = LuceneTranslator.toLuceneQuery(ProvenanceQuery.compile(query).getWhereClause()); + final int batchSize = 1000; + + final ObjectHolder<TopDocs> topDocsHolder = new ObjectHolder<>(null); + return new Iterator<T>() { + int fetched = 0; + int scoreDocIndex = 0; + + @Override + public boolean hasNext() { + if ( topDocsHolder.get() == null ) { + try { + topDocsHolder.set(searcher.search(luceneQuery, batchSize)); + } catch (final IOException ioe) { + throw new EventNotFoundException("Unable to obtain next record from " + LuceneIndexSearcher.this, ioe); + } + } + + final boolean hasNext = fetched < topDocsHolder.get().totalHits; + if ( !hasNext ) { + try { + LuceneIndexSearcher.this.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close {} due to {}", this, ioe.toString()); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + return hasNext; + } + + @Override + public T next() { + if ( !hasNext() ) { + throw new NoSuchElementException(); + } + + TopDocs topDocs = topDocsHolder.get(); + ScoreDoc[] scoreDocs = topDocs.scoreDocs; + if ( scoreDocIndex >= scoreDocs.length ) { + try { + topDocs = searcher.searchAfter(scoreDocs[scoreDocs.length - 1], luceneQuery, batchSize); + topDocsHolder.set(topDocs); + scoreDocs = topDocs.scoreDocs; + scoreDocIndex = 0; + } catch (final IOException ioe) { + throw new EventNotFoundException("Unable to obtain next record from " + LuceneIndexSearcher.this, ioe); + } + } + + final ScoreDoc scoreDoc = scoreDocs[scoreDocIndex++]; + final Document document; + try { + document = searcher.doc(scoreDoc.doc); + } catch (final IOException ioe) { + throw new EventNotFoundException("Unable to obtain next record from " + LuceneIndexSearcher.this, ioe); + } + fetched++; + + return transformer.transform(document); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public Iterator<LazyInitializedProvenanceEvent> select(final String query) throws IOException { + return select(query, new DocumentTransformer<LazyInitializedProvenanceEvent>() { + @Override + public LazyInitializedProvenanceEvent transform(final Document document) { + return new LazyInitializedProvenanceEvent(repo, QueryUtils.createLocation(document), document); + } + }); + } + + @Override + public Iterator<JournaledStorageLocation> selectLocations(final String query) throws IOException { + return select(query, new DocumentTransformer<JournaledStorageLocation>() { + @Override + public JournaledStorageLocation transform(final Document document) { + return QueryUtils.createLocation(document); + } + }); + } + + private static interface DocumentTransformer<T> { + T transform(Document document); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java index d1b2c0e..1f84891 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java @@ -50,6 +50,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; @@ -60,22 +61,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LuceneIndexWriter implements EventIndexWriter { + private static final Store STORE_FIELDS = Store.YES; private static final Logger logger = LoggerFactory.getLogger(LuceneIndexWriter.class); - @SuppressWarnings("unused") - private final JournalingRepositoryConfig config; private final Set<SearchableField> nonAttributeSearchableFields; private final Set<SearchableField> attributeSearchableFields; private final File indexDir; + private final ProvenanceEventRepository repo; private final Directory directory; private final Analyzer analyzer; private final IndexWriter indexWriter; private final AtomicLong indexMaxId = new AtomicLong(-1L); - public LuceneIndexWriter(final File indexDir, final JournalingRepositoryConfig config) throws IOException { + public LuceneIndexWriter(final ProvenanceEventRepository repo, final File indexDir, final JournalingRepositoryConfig config) throws IOException { + this.repo = repo; this.indexDir = indexDir; - this.config = config; attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(config.getSearchableAttributes())); nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(config.getSearchableFields())); @@ -96,7 +97,7 @@ public class LuceneIndexWriter implements EventIndexWriter { logger.trace("Creating index searcher for {}", indexWriter); final DirectoryReader reader = DirectoryReader.open(indexWriter, false); - return new LuceneIndexSearcher(reader, indexDir); + return new LuceneIndexSearcher(repo, reader, indexDir); } @Override @@ -143,30 +144,30 @@ public class LuceneIndexWriter implements EventIndexWriter { final Map<String, String> attributes = event.getAttributes(); final Document doc = new Document(); - addField(doc, SearchableFields.FlowFileUUID, event.getFlowFileUuid(), Store.NO); - addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO); - addField(doc, SearchableFields.ComponentID, event.getComponentId(), Store.NO); - addField(doc, SearchableFields.AlternateIdentifierURI, event.getAlternateIdentifierUri(), Store.NO); - addField(doc, SearchableFields.EventType, event.getEventType().name(), Store.NO); - addField(doc, SearchableFields.Relationship, event.getRelationship(), Store.NO); - addField(doc, SearchableFields.Details, event.getDetails(), Store.NO); - addField(doc, SearchableFields.ContentClaimSection, event.getContentClaimSection(), Store.NO); - addField(doc, SearchableFields.ContentClaimContainer, event.getContentClaimContainer(), Store.NO); - addField(doc, SearchableFields.ContentClaimIdentifier, event.getContentClaimIdentifier(), Store.NO); - addField(doc, SearchableFields.SourceQueueIdentifier, event.getSourceQueueIdentifier(), Store.NO); + addField(doc, SearchableFields.FlowFileUUID, event.getFlowFileUuid(), STORE_FIELDS); + addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), STORE_FIELDS); + addField(doc, SearchableFields.ComponentID, event.getComponentId(), STORE_FIELDS); + addField(doc, SearchableFields.AlternateIdentifierURI, event.getAlternateIdentifierUri(), STORE_FIELDS); + addField(doc, SearchableFields.EventType, event.getEventType().name(), STORE_FIELDS); + addField(doc, SearchableFields.Relationship, event.getRelationship(), STORE_FIELDS); + addField(doc, SearchableFields.Details, event.getDetails(), STORE_FIELDS); + addField(doc, SearchableFields.ContentClaimSection, event.getContentClaimSection(), STORE_FIELDS); + addField(doc, SearchableFields.ContentClaimContainer, event.getContentClaimContainer(), STORE_FIELDS); + addField(doc, SearchableFields.ContentClaimIdentifier, event.getContentClaimIdentifier(), STORE_FIELDS); + addField(doc, SearchableFields.SourceQueueIdentifier, event.getSourceQueueIdentifier(), STORE_FIELDS); if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) { - addField(doc, SearchableFields.TransitURI, event.getTransitUri(), Store.NO); + addField(doc, SearchableFields.TransitURI, event.getTransitUri(), STORE_FIELDS); } for (final SearchableField searchableField : attributeSearchableFields) { - addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO); + addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), STORE_FIELDS); } // Index the fields that we always index (unless there's nothing else to index at all) - doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), event.getLineageStartDate(), Store.NO)); - doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), event.getEventTime(), Store.NO)); - doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), event.getFileSize(), Store.NO)); + doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), event.getLineageStartDate(), STORE_FIELDS)); + doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), event.getEventTime(), STORE_FIELDS)); + doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), event.getFileSize(), STORE_FIELDS)); final JournaledStorageLocation location = event.getStorageLocation(); doc.add(new StringField(IndexedFieldNames.CONTAINER_NAME, location.getContainerName(), Store.YES)); @@ -176,20 +177,20 @@ public class LuceneIndexWriter implements EventIndexWriter { doc.add(new LongField(IndexedFieldNames.EVENT_ID, location.getEventId(), Store.YES)); for (final String lineageIdentifier : event.getLineageIdentifiers()) { - addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO); + addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, STORE_FIELDS); } // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs. if (event.getEventType() == ProvenanceEventType.FORK || event.getEventType() == ProvenanceEventType.CLONE || event.getEventType() == ProvenanceEventType.REPLAY) { for (final String uuid : event.getChildUuids()) { if (!uuid.equals(event.getFlowFileUuid())) { - addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); + addField(doc, SearchableFields.FlowFileUUID, uuid, STORE_FIELDS); } } } else if (event.getEventType() == ProvenanceEventType.JOIN) { for (final String uuid : event.getParentUuids()) { if (!uuid.equals(event.getFlowFileUuid())) { - addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO); + addField(doc, SearchableFields.FlowFileUUID, uuid, STORE_FIELDS); } } } else if (event.getEventType() == ProvenanceEventType.RECEIVE && event.getSourceSystemFlowFileIdentifier() != null) { @@ -205,7 +206,7 @@ public class LuceneIndexWriter implements EventIndexWriter { } if (sourceFlowFileUUID != null) { - addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO); + addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, STORE_FIELDS); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java index 4accf50..b555407 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java @@ -21,11 +21,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent; import org.apache.nifi.provenance.search.Query; +import com.google.common.collect.Iterators; + public class MultiIndexSearcher implements EventIndexSearcher { private final List<EventIndexSearcher> searchers; @@ -160,4 +164,28 @@ public class MultiIndexSearcher implements EventIndexSearcher { public String toString() { return searchers.toString(); } + + @Override + public Iterator<LazyInitializedProvenanceEvent> select(final String query) throws IOException { + final List<Iterator<LazyInitializedProvenanceEvent>> iterators = new ArrayList<>(searchers.size()); + + for ( final EventIndexSearcher searcher : searchers ) { + final Iterator<LazyInitializedProvenanceEvent> itr = searcher.select(query); + iterators.add(itr); + } + + return Iterators.concat(iterators.iterator()); + } + + @Override + public Iterator<JournaledStorageLocation> selectLocations(final String query) throws IOException { + final List<Iterator<JournaledStorageLocation>> iterators = new ArrayList<>(searchers.size()); + + for ( final EventIndexSearcher searcher : searchers ) { + final Iterator<JournaledStorageLocation> itr = searcher.selectLocations(query); + iterators.add(itr); + } + + return Iterators.concat(iterators.iterator()); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java index d8dd5eb..aa516ab 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.lucene.document.Document; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; @@ -41,6 +42,18 @@ import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; import org.apache.nifi.provenance.search.SearchTerm; public class QueryUtils { + + public static JournaledStorageLocation createLocation(final Document document) { + final String containerName = document.get(IndexedFieldNames.CONTAINER_NAME); + final String sectionName = document.get(IndexedFieldNames.SECTION_NAME); + final long journalId = document.getField(IndexedFieldNames.JOURNAL_ID).numericValue().longValue(); + final int blockIndex = document.getField(IndexedFieldNames.BLOCK_INDEX).numericValue().intValue(); + final long eventId = document.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue(); + + return new JournaledStorageLocation(containerName, sectionName, journalId, blockIndex, eventId); + } + + public static org.apache.lucene.search.Query convertQueryToLucene(final org.apache.nifi.provenance.search.Query query) { if (query.getStartDate() == null && query.getEndDate() == null && query.getSearchTerms().isEmpty()) { return new MatchAllDocsQuery();
