http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
new file mode 100644
index 0000000..b6012db
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
@@ -0,0 +1,348 @@
+package org.apache.nifi.pql;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.VolatileProvenanceRepository;
+import org.apache.nifi.provenance.query.ProvenanceResultSet;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestQuery {
+
+       private ProvenanceEventRepository repo;
+       
+       @Before
+       public void setup() {
+               System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
"src/test/resources/nifi.properties");
+               repo = new VolatileProvenanceRepository();
+       }
+       
+       
+       private void createRecords() throws IOException {
+               final Map<String, String> previousAttributes = new HashMap<>();
+               previousAttributes.put("filename", "xyz");
+               
+               final Map<String, String> updatedAttributes = new HashMap<>();
+               updatedAttributes.put("filename", "xyz.txt");
+               updatedAttributes.put("mime.type", "text/plain");
+               
+               final StandardProvenanceEventRecord.Builder recordBuilder = new 
StandardProvenanceEventRecord.Builder();
+               recordBuilder.setAttributes(previousAttributes, 
Collections.<String, String>emptyMap())
+                       .setComponentId("000")
+                       .setComponentType("MyComponent")
+                       .setEventType(ProvenanceEventType.RECEIVE)
+                       .setFlowFileEntryDate(System.currentTimeMillis())
+                       .setFlowFileUUID("1234")
+                       .setTransitUri("https://localhost:80/nifi";);
+               
+               
+               recordBuilder.setCurrentContentClaim("container", "section", 
"1", 0L, 100L);
+               repo.registerEvent(recordBuilder.build());
+               
+               
+               recordBuilder.setAttributes(previousAttributes, 
updatedAttributes);
+               recordBuilder.setCurrentContentClaim("container", "section", 
"2", 0L, 1024 * 1024L);
+               repo.registerEvent(recordBuilder.build());
+       }
+       
+       
+       private void createRecords(final int records, final ProvenanceEventType 
type, final long sleep) throws IOException {
+               final Map<String, String> previousAttributes = new HashMap<>();
+               previousAttributes.put("filename", "xyz");
+               
+               final Map<String, String> updatedAttributes = new HashMap<>();
+               updatedAttributes.put("filename", "xyz.txt");
+               updatedAttributes.put("mime.type", "text/plain");
+               
+               final StandardProvenanceEventRecord.Builder recordBuilder = new 
StandardProvenanceEventRecord.Builder();
+               recordBuilder.setAttributes(previousAttributes, 
Collections.<String, String>emptyMap())
+                       .setComponentType("MyComponent")
+                       .setEventType(type)
+                       .setFlowFileEntryDate(System.currentTimeMillis())
+                       .setFlowFileUUID("1234")
+                       .setTransitUri("https://localhost:80/nifi";);
+               
+               final long now = System.currentTimeMillis();
+               for (int i=0; i < records; i++) {
+                       recordBuilder.setCurrentContentClaim("container", 
"section", String.valueOf(i), 0L, 100L);
+                       final Map<String, String> attr = new 
HashMap<>(updatedAttributes);
+                       attr.put("i", String.valueOf(i));
+                       recordBuilder.setAttributes(previousAttributes, attr);
+                       
recordBuilder.setFlowFileEntryDate(System.currentTimeMillis());
+                       recordBuilder.setEventTime(now + (i * sleep));
+                       
recordBuilder.setComponentId(UUID.randomUUID().toString());
+                       
+                       repo.registerEvent(recordBuilder.build());
+               }
+       }
+       
+       @Test
+       public void testCompilationManually() {
+               System.out.println(ProvenanceQuery.compile("SELECT R.TransitUri 
FROM *"));
+               System.out.println(ProvenanceQuery.compile("SELECT 
R['filename'] FROM RECEIVE, SEND;"));
+               System.out.println(ProvenanceQuery.compile("SELECT Event FROM 
RECEIVE ORDER BY Event['filename'];"));
+               
+//             System.out.println(Query.compile("SELECT Event FROM RECEIVE 
WHERE ((Event.TransitUri <> 'http') OR (Event['filename'] = '1.txt')) and 
(Event.Size > 1000 or Event.Size between 1 AND 4);"));
+               
+               System.out.println(ProvenanceQuery.compile("SELECT 
SUM(Event.size) FROM RECEIVE"));
+       }
+       
+       
+       @Test
+       public void testSumAverage() throws IOException {
+               createRecords();
+               dump(ProvenanceQuery.execute("SELECT Event", repo));
+               
+               final ProvenanceQuery query = ProvenanceQuery.compile("SELECT 
SUM(Event.Size), AVG(Event.Size) FROM RECEIVE WHERE Event.TransitUri = 
'https://localhost:80/nifi'");
+               
+               final ProvenanceResultSet rs = query.execute(repo);
+               dump(rs);
+               
+               dump(ProvenanceQuery.execute("SELECT Event.TransitUri", repo));
+               dump(ProvenanceQuery.execute("SELECT Event['mime.type'], 
Event['filename']", repo));
+               dump(ProvenanceQuery.execute("SELECT Event['filename'], 
SUM(Event.size) GROUP BY Event['filename']", repo));
+       }
+       
+       
+       @Test
+       public void testGroupBy() throws IOException {
+               createRecords(200000, ProvenanceEventType.RECEIVE, 0L);
+               createRecords(2, ProvenanceEventType.SEND, 0L);
+               
+               ProvenanceResultSet rs = ProvenanceQuery.execute("SELECT 
Event['filename'], COUNT(Event), Event.Type GROUP BY Event['filename'], 
Event.Type", repo);
+               dump(rs);
+               
+               rs = ProvenanceQuery.execute("SELECT Event['filename'], 
COUNT(Event), Event.Type GROUP BY Event['filename'], Event.Type", repo);
+               
+               int receiveRows = 0;
+               int sendRows = 0;
+               while (rs.hasNext()) {
+                   final List<?> cols = rs.next();
+                   final ProvenanceEventType type = (ProvenanceEventType) 
cols.get(2);
+                   if ( type == ProvenanceEventType.RECEIVE ) {
+                       receiveRows++;
+                       assertEquals("xyz.txt", cols.get(0));
+                       assertEquals(200000L, cols.get(1));
+                   } else if ( type == ProvenanceEventType.SEND ) {
+                       sendRows++;
+                       assertEquals("xyz.txt", cols.get(0));
+                       assertEquals(2L, cols.get(1));
+                   } else {
+                       Assert.fail("Event type was " + type);
+                   }
+               }
+               
+               assertEquals(1, receiveRows);
+               assertEquals(1, sendRows);
+       }
+       
+       
+       @Test
+    public void testAverageGroupBy() throws IOException {
+        createRecords(200000, ProvenanceEventType.RECEIVE, 1L);
+        createRecords(5000, ProvenanceEventType.SEND, 1L);
+        
+        dump(ProvenanceQuery.execute("SELECT AVG(Event.Size), Event.Type GROUP 
BY SECOND(Event.Time), Event.Type", repo));
+    }
+       
+       
+       @Test
+       public void testSelectSeveralRecords() throws IOException {
+               createRecords(2000, ProvenanceEventType.SEND, 1L);
+               createRecords(200, ProvenanceEventType.RECEIVE, 1L);
+               dump(ProvenanceQuery.execute(
+                                 "SELECT SECOND(Event.Time), Event.Type, 
SUM(Event.Size), COUNT(Event) "
+                               + "FROM SEND, RECEIVE "
+                               + "GROUP BY SECOND(Event.Time), Event.Type"
+                               , repo));
+       }
+
+       @Test
+       public void testNot() throws IOException {
+               createRecords(2000, ProvenanceEventType.SEND, 0L);
+               createRecords(200, ProvenanceEventType.RECEIVE, 0L);
+
+               dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) 
WHERE NOT(Event.Type = 'SEND')", repo));
+               dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) 
WHERE NOT(Event.Type = 'RECEIVE')", repo));
+               dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) 
WHERE NOT(NOT( Event.Type = 'SEND'))", repo));
+       }
+       
+       
+       @Test
+       public void testOrderByField() throws IOException {
+               createRecords(2000, ProvenanceEventType.SEND, 1L);
+               
+               dump(ProvenanceQuery.execute("SELECT Event.Time, 
Event.ComponentId ORDER BY Event.ComponentId LIMIT 15", repo));
+               dump(ProvenanceQuery.execute("SELECT Event.Time, 
Event.ComponentId ORDER BY Event.Time DESC LIMIT 15", repo));
+       }
+       
+
+       @Test
+       public void testOrderByGroupedField() throws IOException {
+               createRecords(2, ProvenanceEventType.SEND, 0L);
+               createRecords(5, ProvenanceEventType.RECEIVE, 0L);
+               
+               dump(ProvenanceQuery.execute("SELECT Event.Type, 
SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) DESC", repo));
+               
+               ProvenanceResultSet rs = ProvenanceQuery.execute("SELECT 
Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) DESC", 
repo);
+               
+               assertTrue( rs.hasNext() );
+               List<?> values = rs.next();
+               assertEquals("RECEIVE", values.get(0).toString());
+               assertEquals(500L, values.get(1));
+               
+               assertTrue( rs.hasNext() );
+               values = rs.next();
+               assertEquals("SEND", values.get(0).toString());
+               assertEquals(200L, values.get(1));
+               
+               assertFalse( rs.hasNext() );
+               
+               
+               rs = ProvenanceQuery.execute("SELECT Event.Type, 
SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) ASC", repo);
+               
+               assertTrue( rs.hasNext() );
+               values = rs.next();
+               assertEquals("SEND", values.get(0).toString());
+               assertEquals(200L, values.get(1));
+               
+               assertTrue( rs.hasNext() );
+               values = rs.next();
+               assertEquals("RECEIVE", values.get(0).toString());
+               assertEquals(500L, values.get(1));
+               
+               assertFalse( rs.hasNext() );
+       }
+       
+       
+       @Test
+       public void testOrderByFieldAndGroupedValue() throws IOException {
+               createRecords(3, ProvenanceEventType.SEND, 0L);
+               createRecords(5, ProvenanceEventType.RECEIVE, 0L);
+               createRecords(3, ProvenanceEventType.ATTRIBUTES_MODIFIED, 0L);
+               
+               final String query = "SELECT Event.Type, SUM(Event.Size) GROUP 
BY Event.Type ORDER BY SUM(Event.Size) DESC, Event.Type";
+               dump(ProvenanceQuery.execute(query, repo));
+               
+               ProvenanceResultSet rs = ProvenanceQuery.execute(query, repo);
+               
+               assertTrue( rs.hasNext() );
+               List<?> vals = rs.next();
+               assertEquals(2, vals.size());
+               assertEquals("RECEIVE", vals.get(0).toString());
+               assertEquals(500L, vals.get(1));
+               
+               assertTrue( rs.hasNext() );
+               vals = rs.next();
+               assertEquals(2, vals.size());
+               assertEquals("ATTRIBUTES_MODIFIED", vals.get(0).toString());
+               assertEquals(300L, vals.get(1));
+               
+               assertTrue( rs.hasNext() );
+               vals = rs.next();
+               assertEquals(2, vals.size());
+               assertEquals("SEND", vals.get(0).toString());
+               assertEquals(300L, vals.get(1));
+               
+               assertFalse( rs.hasNext() );
+       }
+       
+       
+       @Test
+       public void testAndsOrs() throws IOException {
+
+               final Map<String, String> previousAttributes = new HashMap<>();
+               previousAttributes.put("filename", "xyz");
+               
+               final Map<String, String> updatedAttributes = new HashMap<>();
+               updatedAttributes.put("filename", "xyz.txt");
+               updatedAttributes.put("mime.type", "text/plain");
+               updatedAttributes.put("abc", "cba");
+               updatedAttributes.put("123", "321");
+               
+               final StandardProvenanceEventRecord.Builder recordBuilder = new 
StandardProvenanceEventRecord.Builder();
+               recordBuilder.setAttributes(previousAttributes, 
Collections.<String, String>emptyMap())
+                       .setComponentId("000")
+                       .setComponentType("MyComponent")
+                       .setEventType(ProvenanceEventType.SEND)
+                       .setFlowFileEntryDate(System.currentTimeMillis())
+                       .setFlowFileUUID("1234")
+                       .setCurrentContentClaim("container", "section", "1", 
0L, 100L)
+                       .setAttributes(previousAttributes, updatedAttributes)
+                       .setTransitUri("https://localhost:80/nifi";);
+
+               repo.registerEvent(recordBuilder.build());
+               
+               final String queryString = "SELECT Event "
+                               + "WHERE "
+                               + "( "
+                               + "      Event['filename'] = 'xyz.txt' "
+                               + "             OR "
+                               + "  Event['mime.type'] = 'ss' "
+                               + ") "
+                               + "AND "
+                               + "( "
+                               + "  Event['abc'] = 'cba' "
+                               + "             OR "
+                               + "      Event['123'] = '123' "
+                               + ")";
+               System.out.println(queryString);
+               
+               final ProvenanceQuery query = 
ProvenanceQuery.compile(queryString);
+               
+               System.out.println(query.getWhereClause());
+               
+               ProvenanceResultSet rs = query.execute(repo);
+               assertTrue(rs.hasNext());
+               rs.next();
+               assertFalse(rs.hasNext());
+               
+               
+               
+               updatedAttributes.put("filename", "xxyz");
+               repo = new VolatileProvenanceRepository();
+               recordBuilder.setAttributes(previousAttributes, 
updatedAttributes);
+               repo.registerEvent(recordBuilder.build());
+               
+               rs = query.execute(repo);
+               assertFalse(rs.hasNext());
+
+
+               
+               updatedAttributes.put("filename", "xyz.txt");
+               updatedAttributes.put("123", "123");
+               repo = new VolatileProvenanceRepository();
+               recordBuilder.setAttributes(previousAttributes, 
updatedAttributes);
+               repo.registerEvent(recordBuilder.build());
+
+               rs = query.execute(repo);
+               assertTrue(rs.hasNext());
+               rs.next();
+               assertFalse(rs.hasNext());
+               
+       }
+       
+       private void dump(final ProvenanceResultSet rs) {
+               System.out.println(rs.getLabels());
+               while (rs.hasNext()) {
+                       System.out.println(rs.next());
+               }
+               
+               System.out.println("\n\n\n");
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/test/resources/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/test/resources/nifi.properties
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/test/resources/nifi.properties
new file mode 100644
index 0000000..9d0683d
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/test/resources/nifi.properties
@@ -0,0 +1,136 @@
+# Core Properties #
+nifi.version=nifi-0.0.1
+nifi.flow.configuration.file=./conf/flow.xml.gz
+nifi.flow.configuration.archive.dir=./conf/archive/
+nifi.flowcontroller.autoResumeState=false
+nifi.flowcontroller.graceful.shutdown.period=10 sec
+nifi.flowservice.writedelay.interval=2 sec
+nifi.administrative.yield.duration=30 sec
+
+nifi.authority.provider.configuration.file=./conf/authority-providers.xml
+nifi.reporting.task.configuration.file=./conf/reporting-tasks.xml
+nifi.controller.service.configuration.file=./conf/controller-services.xml
+nifi.templates.directory=./conf/templates
+nifi.ui.banner.text=
+nifi.ui.autorefresh.interval=30 sec
+nifi.nar.library.directory=./lib
+nifi.nar.working.directory=./work/nar/
+nifi.documentation.working.directory=./work/docs/components
+
+# H2 Settings
+nifi.database.directory=./database_repository
+nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
+
+# FlowFile Repository
+nifi.flowfile.repository.directory=./flowfile_repository
+nifi.flowfile.repository.partitions=256
+nifi.flowfile.repository.checkpoint.interval=2 mins
+nifi.queue.swap.threshold=20000
+nifi.swap.storage.directory=./flowfile_repository/swap
+nifi.swap.in.period=5 sec
+nifi.swap.in.threads=1
+nifi.swap.out.period=5 sec
+nifi.swap.out.threads=4
+
+# Content Repository
+nifi.content.claim.max.appendable.size=10 MB
+nifi.content.claim.max.flow.files=100
+nifi.content.repository.directory.default=./content_repository
+
+
+# VOlatile prov properties
+nifi.provenance.repository.buffer.size=250000
+
+
+# Provenance Repository Properties
+nifi.provenance.repository.implementation=nifi.provenance.PersistentProvenanceRepository
+nifi.provenance.repository.directory.default=./provenance_repository
+nifi.provenance.repository.max.storage.time=24 hours
+nifi.provenance.repository.max.storage.size=1 GB
+nifi.provenance.repository.rollover.time=5 mins
+nifi.provenance.repository.rollover.size=100 MB
+nifi.provenance.repository.query.threads=2
+nifi.provenance.repository.compress.on.rollover=true
+# Comma-separated list of fields. Fields that are not indexed will not be 
searchable. Valid fields are: 
+# EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, 
AlternateIdentifierURI, ContentType, Relationship, Details
+nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename, 
ProcessorID
+# FlowFile Attributes that should be indexed and made searchable
+nifi.provenance.repository.indexed.attributes=
+# Large values for the shard size will result in more Java heap usage when 
searching the Provenance Repository
+# but should provide better performance
+nifi.provenance.repository.index.shard.size=500 MB
+
+# Component Status Repository
+nifi.components.status.repository.implementation=nifi.controller.status.history.VolatileComponentStatusRepository
+nifi.components.status.repository.buffer.size=288
+nifi.components.status.snapshot.frequency=5 mins
+
+# Site to Site properties
+nifi.remote.input.socket.port=
+nifi.remote.input.secure=true
+
+# web properties #
+nifi.web.war.directory=./lib
+nifi.web.http.interfaces=eth6
+nifi.web.http.host=
+nifi.web.http.port=8080
+nifi.web.https.interfaces=
+nifi.web.https.host=
+nifi.web.https.port=
+nifi.web.jetty.working.directory=./work/jetty
+
+# security properties #
+nifi.sensitive.props.key=password
+nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
+nifi.sensitive.props.provider=BC
+
+nifi.security.keystore=
+nifi.security.keystoreType=
+nifi.security.keystorePasswd=
+nifi.security.keyPasswd=
+nifi.security.truststore=
+nifi.security.truststoreType=
+nifi.security.truststorePasswd=
+nifi.security.needClientAuth=
+nifi.security.user.credential.cache.duration=24 hours
+nifi.security.user.authority.provider=file-provider
+nifi.security.support.new.account.requests=
+nifi.security.ocsp.responder.url=
+nifi.security.ocsp.responder.certificate=
+
+# cluster common properties (cluster manager and nodes must have same values) #
+nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.is.secure=false
+nifi.cluster.protocol.socket.timeout=30 sec
+nifi.cluster.protocol.connection.handshake.timeout=45 sec
+# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties 
must be configured #
+nifi.cluster.protocol.use.multicast=false
+nifi.cluster.protocol.multicast.address=
+nifi.cluster.protocol.multicast.port=
+nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms
+nifi.cluster.protocol.multicast.service.locator.attempts=3
+nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec
+
+# cluster node properties (only configure for cluster nodes) #
+nifi.cluster.is.node=false
+nifi.cluster.node.address=
+nifi.cluster.node.interfaces=
+nifi.cluster.node.protocol.port=
+nifi.cluster.node.protocol.threads=2
+# if multicast is not used, nifi.cluster.node.unicast.xxx must have same 
values as nifi.cluster.manager.xxx #
+nifi.cluster.node.unicast.manager.address=
+nifi.cluster.node.unicast.manager.protocol.port=
+
+# cluster manager properties (only configure for cluster manager) #
+nifi.cluster.is.manager=false
+nifi.cluster.manager.address=
+nifi.cluster.manager.interfaces=eth6
+nifi.cluster.manager.protocol.port=
+nifi.cluster.manager.node.firewall.file=
+nifi.cluster.manager.node.event.history.size=10
+nifi.cluster.manager.node.api.connection.timeout=30 sec
+nifi.cluster.manager.node.api.read.timeout=30 sec
+nifi.cluster.manager.node.api.request.threads=10
+nifi.cluster.manager.flow.retrieval.delay=5 sec
+nifi.cluster.manager.protocol.threads=10
+nifi.cluster.manager.safemode.duration=0 sec

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
index 4e9e9fb..3d5163e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
@@ -25,6 +25,15 @@
                        <artifactId>nifi-properties</artifactId>
                </dependency>
                <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-provenance-query-language</artifactId>
+                       <version>0.0.2-incubating-SNAPSHOT</version>
+               </dependency>
+               <dependency>
                        <groupId>org.apache.lucene</groupId>
                        <artifactId>lucene-core</artifactId>
                </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
index ffa9676..2643787 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@ -25,7 +25,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.pql.ProvenanceQuery;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.ProvenanceEventBuilder;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -52,6 +55,7 @@ import 
org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.StorageLocation;
 import org.apache.nifi.provenance.StoredProvenanceEvent;
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.exception.EventNotFoundException;
 import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
 import org.apache.nifi.provenance.journaling.index.IndexAction;
 import org.apache.nifi.provenance.journaling.index.IndexManager;
@@ -69,6 +73,9 @@ import 
org.apache.nifi.provenance.journaling.query.StandardQueryManager;
 import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
 import org.apache.nifi.provenance.journaling.toc.TocReader;
 import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.query.ProvenanceQueryResult;
+import org.apache.nifi.provenance.query.ProvenanceQuerySubmission;
+import org.apache.nifi.provenance.query.ProvenanceResultSet;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QuerySubmission;
 import org.apache.nifi.provenance.search.SearchableField;
@@ -78,7 +85,7 @@ import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+// TODO: read-only is not checked everywhere!
 public class JournalingProvenanceRepository implements 
ProvenanceEventRepository {
     public static final String WORKER_THREAD_POOL_SIZE = 
"nifi.provenance.repository.worker.threads";
     public static final String BLOCK_SIZE = 
"nifi.provenance.repository.writer.block.size";
@@ -90,7 +97,7 @@ public class JournalingProvenanceRepository implements 
ProvenanceEventRepository
     
     // the follow member variables are effectively final. They are initialized
     // in the initialize method rather than the constructor because we want to 
ensure
-    // that they only not created every time that the Java Service Loader 
instantiates the class.
+    // that they are not created every time that the Java Service Loader 
instantiates the class.
     private ScheduledExecutorService workerExecutor;
     private ExecutorService queryExecutor;
     private ExecutorService compressionExecutor;
@@ -226,7 +233,7 @@ public class JournalingProvenanceRepository implements 
ProvenanceEventRepository
         final int compressionThreads = Math.max(1, 
config.getCompressionThreadPoolSize());
         this.compressionExecutor = 
Executors.newFixedThreadPool(compressionThreads, 
createThreadFactory("Provenance Repository Compression Thread"));
         
-        this.indexManager = new LuceneIndexManager(config, workerExecutor, 
queryExecutor);
+        this.indexManager = new LuceneIndexManager(this, config, 
workerExecutor, queryExecutor);
         this.partitionManager = new QueuingPartitionManager(indexManager, 
idGenerator, config, workerExecutor, compressionExecutor);
         this.queryManager = new StandardQueryManager(indexManager, 
queryExecutor, config, 10);
         
@@ -436,6 +443,154 @@ public class JournalingProvenanceRepository implements 
ProvenanceEventRepository
         return maxId;
     }
 
+    ProgressAwareIterator<? extends StoredProvenanceEvent> 
selectMatchingEvents(final String query, final AtomicLong lastTimeProgressMade) 
throws IOException {
+        final Set<EventIndexSearcher> searchers = indexManager.getSearchers();
+        final Iterator<EventIndexSearcher> searchItr = searchers.iterator();
+        
+        return new ProgressAwareIterator<StoredProvenanceEvent>() {
+            private Iterator<LazyInitializedProvenanceEvent> eventItr;
+            private int searchersComplete = 0;
+            private EventIndexSearcher currentSearcher;
+            
+            @Override
+            public int getPercentComplete() {
+                return searchers.isEmpty() ? 100 : searchersComplete / 
searchers.size() * 100;
+            }
+            
+            @Override
+            public boolean hasNext() {
+                // while the event iterator has no information...
+                while ( eventItr == null || !eventItr.hasNext() ) {
+                    // if there's not another searcher then we're out of 
events.
+                    if ( !searchItr.hasNext() ) {
+                        return false;
+                    }
+                    
+                    // we're finished with this searcher. Close it.
+                    if ( currentSearcher != null ) {
+                        try {
+                            currentSearcher.close();
+                        } catch (final IOException ioe) {
+                            logger.warn("Failed to close {} due to {}", 
currentSearcher, ioe.toString());
+                            if ( logger.isDebugEnabled() ) {
+                                logger.warn("", ioe);
+                            }
+                        }
+                    }
+                    
+                    // We have a searcher. get events from it. If there are no 
matches,
+                    // then our while loop will keep going.
+                    currentSearcher = searchItr.next();
+                    searchersComplete++;
+                    
+                    try {
+                        eventItr = currentSearcher.select(query);
+                    } catch (final IOException ioe) {
+                        throw new EventNotFoundException("Could not find next 
event", ioe);
+                    }
+                }
+                
+                // the event iterator has no events, and the search iterator 
has no more
+                // searchers. There are no more events.
+                return eventItr != null && eventItr.hasNext();
+            }
+
+            @Override
+            public StoredProvenanceEvent next() {
+                lastTimeProgressMade.set(System.nanoTime());
+                return eventItr.next();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+    
+    public ProvenanceResultSet query(final String query) throws IOException {
+        final ProvenanceQuerySubmission submission = submitQuery(query);
+        return submission.getResult().getResultSet();
+    }
+    
+    
+    public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final 
String queryIdentifier) {
+        return queryManager.retrieveProvenanceQuerySubmission(queryIdentifier);
+    }
+    
+    public ProvenanceQuerySubmission submitQuery(final String query) {
+        ProvenanceQuerySubmission submission;
+        final AtomicLong lastTimeProgressMade = new 
AtomicLong(System.nanoTime());
+        final long tenMinsInNanos = TimeUnit.MINUTES.toNanos(10);
+        
+        try {
+            final ProgressAwareIterator<? extends StoredProvenanceEvent> 
eventItr = selectMatchingEvents(query, lastTimeProgressMade);
+            final ProvenanceResultSet rs = 
ProvenanceQuery.compile(query).evaluate(eventItr);
+            
+            submission = new JournalingRepoQuerySubmission(query, new 
ProvenanceQueryResult() {
+                @Override
+                public ProvenanceResultSet getResultSet() {
+                    return rs;
+                }
+
+                @Override
+                public Date getExpiration() {
+                    return new Date(tenMinsInNanos + 
lastTimeProgressMade.get());
+                }
+
+                @Override
+                public String getError() {
+                    return null;
+                }
+
+                @Override
+                public int getPercentComplete() {
+                    return eventItr.getPercentComplete();
+                }
+
+                @Override
+                public boolean isFinished() {
+                    return eventItr.getPercentComplete() >= 100;
+                }
+            });
+        } catch (final IOException ioe) {
+            logger.error("Failed to perform query {} due to {}", query, 
ioe.toString());
+            if ( logger.isDebugEnabled() ) {
+                logger.error("", ioe);
+            }
+            
+            submission = new JournalingRepoQuerySubmission(query, new 
ProvenanceQueryResult() {
+                @Override
+                public ProvenanceResultSet getResultSet() {
+                    return null;
+                }
+
+                @Override
+                public Date getExpiration() {
+                    return new Date(tenMinsInNanos + 
lastTimeProgressMade.get());
+                }
+
+                @Override
+                public String getError() {
+                    return "Failed to perform query due to " + ioe;
+                }
+
+                @Override
+                public int getPercentComplete() {
+                    return 0;
+                }
+
+                @Override
+                public boolean isFinished() {
+                    return true;
+                }
+            });
+        }
+        
+        queryManager.registerSubmission(submission);
+        return submission;
+    }
+    
     
     @Override
     public QuerySubmission submitQuery(final Query query) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingRepoQuerySubmission.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingRepoQuerySubmission.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingRepoQuerySubmission.java
new file mode 100644
index 0000000..f755bc6
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingRepoQuerySubmission.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling;
+
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.query.ProvenanceQueryResult;
+import org.apache.nifi.provenance.query.ProvenanceQuerySubmission;
+
+public class JournalingRepoQuerySubmission implements 
ProvenanceQuerySubmission {
+
+    private final Date submissionTime = new Date();
+    private final String identifier = UUID.randomUUID().toString();
+    private final ProvenanceQueryResult result;
+    
+    private final String query;
+    private final Runnable cancelCallback;
+    
+    private volatile boolean canceled = false;
+    
+    public JournalingRepoQuerySubmission(final String query, final 
ProvenanceQueryResult queryResult) {
+        this(query, queryResult, null);
+    }
+    
+    public JournalingRepoQuerySubmission(final String query, final 
ProvenanceQueryResult queryResult, final Runnable cancelCallback) {
+        this.query = query;
+        this.cancelCallback = cancelCallback;
+        this.result = queryResult;
+    }
+    
+    @Override
+    public String getQuery() {
+        return query;
+    }
+
+    @Override
+    public ProvenanceQueryResult getResult() {
+        return result;
+    }
+
+    @Override
+    public Date getSubmissionTime() {
+        return submissionTime;
+    }
+
+    @Override
+    public String getQueryIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public void cancel() {
+        this.canceled = true;
+        
+        if ( cancelCallback != null ) {
+            cancelCallback.run();
+        }
+    }
+
+    @Override
+    public boolean isCanceled() {
+        return canceled;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java
new file mode 100644
index 0000000..d0562e2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexableField;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StorageLocation;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.journaling.exception.EventNotFoundException;
+import org.apache.nifi.provenance.journaling.index.IndexedFieldNames;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LazyInitializedProvenanceEvent implements StoredProvenanceEvent {
+    private static final Logger logger = 
LoggerFactory.getLogger(LazyInitializedProvenanceEvent.class);
+    
+    private final ProvenanceEventRepository repo;
+    private final StorageLocation storageLocation;
+    private final Document doc;
+    private ProvenanceEventRecord fullRecord;
+    
+    public LazyInitializedProvenanceEvent(final ProvenanceEventRepository 
repo, final StorageLocation storageLocation, final Document document) {
+        this.repo = repo;
+        this.storageLocation = storageLocation;
+        this.doc = document;
+    }
+
+    @Override
+    public long getEventId() {
+        return 
doc.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue();
+    }
+    
+    @Override
+    public StorageLocation getStorageLocation() {
+        return storageLocation;
+    }
+
+    @Override
+    public long getEventTime() {
+        return 
doc.getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue();
+    }
+
+    private void ensureFullyLoaded() {
+        if ( fullRecord != null ) {
+            return;
+        }
+        
+        final long id = getEventId();
+        try {
+            fullRecord = repo.getEvent(id);
+        } catch (final IOException ioe) {
+            final String containerName = 
doc.get(IndexedFieldNames.CONTAINER_NAME);
+            final String sectionName = doc.get(IndexedFieldNames.SECTION_NAME);
+            final String journalId = doc.get(IndexedFieldNames.JOURNAL_ID);
+            
+            final String error = "Failed to load event with ID " + id + " from 
container '" + containerName + "', section '" + sectionName + "', journal '" + 
journalId + "' due to " + ioe;
+            logger.error(error);
+            if ( logger.isDebugEnabled() ) {
+                logger.error("", ioe);
+            }
+
+            throw new EventNotFoundException(error);
+        }
+    }
+    
+    @Override
+    public long getFlowFileEntryDate() {
+        ensureFullyLoaded();
+        return fullRecord.getFlowFileEntryDate();
+    }
+
+    @Override
+    public long getLineageStartDate() {
+        final IndexableField field = 
doc.getField(SearchableFields.LineageStartDate.getSearchableFieldName());
+        if ( field != null ) {
+            return field.numericValue().longValue();
+        }
+
+        ensureFullyLoaded();
+        return fullRecord.getLineageStartDate();
+    }
+
+    @Override
+    public Set<String> getLineageIdentifiers() {
+        ensureFullyLoaded();
+        return fullRecord.getLineageIdentifiers();
+    }
+
+    @Override
+    public long getFileSize() {
+        final IndexableField field = 
doc.getField(SearchableFields.FileSize.getSearchableFieldName());
+        if ( field != null ) {
+            return field.numericValue().longValue();
+        }
+
+        ensureFullyLoaded();
+        return fullRecord.getFileSize();
+    }
+
+    @Override
+    public Long getPreviousFileSize() {
+        ensureFullyLoaded();
+        return fullRecord.getPreviousFileSize();
+    }
+
+    @Override
+    public long getEventDuration() {
+        // TODO: Allow Event Duration to be indexed; it could be interesting 
for reporting.
+        ensureFullyLoaded();
+        return fullRecord.getEventDuration();
+    }
+
+    @Override
+    public ProvenanceEventType getEventType() {
+        final String name = 
doc.get(SearchableFields.EventType.getSearchableFieldName());
+        return ProvenanceEventType.valueOf(name.toUpperCase());
+    }
+
+    @Override
+    public Map<String, String> getAttributes() {
+        ensureFullyLoaded();
+        return fullRecord.getAttributes();
+    }
+
+    @Override
+    public String getAttribute(final String attributeName) {
+        final String attr = doc.get(attributeName);
+        if ( attr == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getAttribute(attributeName);
+        } else {
+            return attr;
+        }
+    }
+
+    @Override
+    public Map<String, String> getPreviousAttributes() {
+        ensureFullyLoaded();
+        return fullRecord.getPreviousAttributes();
+    }
+
+    @Override
+    public Map<String, String> getUpdatedAttributes() {
+        ensureFullyLoaded();
+        return fullRecord.getUpdatedAttributes();
+    }
+
+    @Override
+    public String getComponentId() {
+        final String componentId = 
doc.get(SearchableFields.ComponentID.getSearchableFieldName());
+        if ( componentId == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getComponentId();
+        } else {
+            return componentId;
+        }
+    }
+
+    @Override
+    public String getComponentType() {
+        // TODO: Make indexable.
+        ensureFullyLoaded();
+        return fullRecord.getComponentType();
+    }
+
+    @Override
+    public String getTransitUri() {
+        final String transitUri = 
doc.get(SearchableFields.TransitURI.getSearchableFieldName());
+        if ( transitUri == null ) {
+            final ProvenanceEventType eventType = getEventType();
+            switch (eventType) {
+                case RECEIVE:
+                case SEND:
+                    ensureFullyLoaded();
+                    return fullRecord.getTransitUri();
+                default:
+                    return null;
+            }
+        } else {
+            return transitUri;
+        }
+    }
+
+    @Override
+    public String getSourceSystemFlowFileIdentifier() {
+        ensureFullyLoaded();
+        return fullRecord.getSourceSystemFlowFileIdentifier();
+    }
+
+    @Override
+    public String getFlowFileUuid() {
+        return doc.get(SearchableFields.FlowFileUUID.getSearchableFieldName());
+    }
+
+    @Override
+    public List<String> getParentUuids() {
+        final IndexableField[] uuids = 
doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName());
+        if ( uuids.length < 2 ) {
+            return Collections.emptyList();
+        }
+        
+        switch (getEventType()) {
+            case JOIN: {
+                final List<String> parentUuids = new ArrayList<>(uuids.length 
- 1);
+                for (int i=1; i < uuids.length; i++) {
+                    parentUuids.add(uuids[i].stringValue());
+                }
+                return parentUuids;
+            }
+            default:
+                return Collections.emptyList();
+        }
+    }
+
+    @Override
+    public List<String> getChildUuids() {
+        final IndexableField[] uuids = 
doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName());
+        if ( uuids.length < 2 ) {
+            return Collections.emptyList();
+        }
+        
+        switch (getEventType()) {
+            case REPLAY:
+            case CLONE:
+            case FORK: {
+                final List<String> childUuids = new ArrayList<>(uuids.length - 
1);
+                for (int i=1; i < uuids.length; i++) {
+                    childUuids.add(uuids[i].stringValue());
+                }
+                return childUuids;
+            }
+            default:
+                return Collections.emptyList();
+        }
+    }
+
+    @Override
+    public String getAlternateIdentifierUri() {
+        final String altId = 
doc.get(SearchableFields.AlternateIdentifierURI.getSearchableFieldName());
+        if ( altId == null && getEventType() == ProvenanceEventType.ADDINFO ) {
+            ensureFullyLoaded();
+            return fullRecord.getAlternateIdentifierUri();
+        } else { 
+            return null;
+        }
+    }
+
+    @Override
+    public String getDetails() {
+        final String details = 
doc.get(SearchableFields.Details.getSearchableFieldName());
+        if ( details == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getDetails();
+        }
+        return null;
+    }
+
+    @Override
+    public String getRelationship() {
+        final String relationship = 
doc.get(SearchableFields.Relationship.getSearchableFieldName());
+        if ( relationship == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getRelationship();
+        }
+        return null;
+    }
+
+    @Override
+    public String getSourceQueueIdentifier() {
+        final String queueId = 
doc.get(SearchableFields.SourceQueueIdentifier.getSearchableFieldName());
+        if ( queueId == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getSourceQueueIdentifier();
+        }
+        return null;
+    }
+
+    @Override
+    public String getContentClaimSection() {
+        final String claimSection = 
doc.get(SearchableFields.ContentClaimSection.getSearchableFieldName());
+        if ( claimSection == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getContentClaimSection();
+        }
+        return null;
+    }
+
+    @Override
+    public String getPreviousContentClaimSection() {
+        ensureFullyLoaded();
+        return fullRecord.getPreviousContentClaimSection();
+    }
+
+    @Override
+    public String getContentClaimContainer() {
+        final String claimContainer = 
doc.get(SearchableFields.ContentClaimContainer.getSearchableFieldName());
+        if ( claimContainer == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getContentClaimContainer();
+        }
+        return null;
+    }
+
+    @Override
+    public String getPreviousContentClaimContainer() {
+        ensureFullyLoaded();
+        return fullRecord.getPreviousContentClaimContainer();
+    }
+
+    @Override
+    public String getContentClaimIdentifier() {
+        final String claimIdentifier = 
doc.get(SearchableFields.ContentClaimIdentifier.getSearchableFieldName());
+        if ( claimIdentifier == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getContentClaimIdentifier();
+        }
+        return null;
+    }
+
+    @Override
+    public String getPreviousContentClaimIdentifier() {
+        ensureFullyLoaded();
+        return fullRecord.getPreviousContentClaimIdentifier();
+    }
+
+    @Override
+    public Long getContentClaimOffset() {
+        final String claimOffset = 
doc.get(SearchableFields.ContentClaimOffset.getSearchableFieldName());
+        if ( claimOffset == null ) {
+            ensureFullyLoaded();
+            return fullRecord.getContentClaimOffset();
+        }
+        return null;
+    }
+
+    @Override
+    public Long getPreviousContentClaimOffset() {
+        ensureFullyLoaded();
+        return fullRecord.getPreviousContentClaimOffset();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/ProgressAwareIterator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/ProgressAwareIterator.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/ProgressAwareIterator.java
new file mode 100644
index 0000000..c473224
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/ProgressAwareIterator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling;
+
+import java.util.Iterator;
+
+public interface ProgressAwareIterator<T> extends Iterator<T> {
+
+    int getPercentComplete();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/exception/EventNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/exception/EventNotFoundException.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/exception/EventNotFoundException.java
new file mode 100644
index 0000000..398cf8e
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/exception/EventNotFoundException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.exception;
+
+public class EventNotFoundException extends RuntimeException {
+    private static final long serialVersionUID = -8490783814308479930L;
+
+    public EventNotFoundException(final String message) {
+        super(message);
+    }
+    
+    public EventNotFoundException(final String message, final Throwable cause) 
{
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
index 85e02c0..1761057 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
@@ -19,9 +19,11 @@ package org.apache.nifi.provenance.journaling.index;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent;
 import org.apache.nifi.provenance.search.Query;
 
 public interface EventIndexSearcher extends Closeable {
@@ -82,4 +84,22 @@ public interface EventIndexSearcher extends Closeable {
      * @throws IOException
      */
     long getNumberOfEvents() throws IOException;
+    
+    /**
+     * Evaluates the given query against the index, returning an iterator of 
lazily initialized provenance events
+     * 
+     * @param query
+     * @throws IOException
+     */
+    Iterator<LazyInitializedProvenanceEvent> select(String query) throws 
IOException;
+    
+    /**
+     * Evaluates the given query against the index, returning an iterator of 
locations from which the matching
+     * records can be retrieved
+     * 
+     * @param query
+     * @return
+     * @throws IOException
+     */
+    Iterator<JournaledStorageLocation> selectLocations(String query) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
index 34d1b18..9e1fc39 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
@@ -67,6 +67,24 @@ public interface IndexManager extends Closeable {
     <T> Set<T> withEachIndex(IndexAction<T> action) throws IOException;
     
     /**
+     * Executes the given action against each index and returns a Set of 
results,
+     * where each result is obtained from performing the given action against 
one index
+     * 
+     * @param action the action to perform
+     * @param autoClose whether or not the EventIndexSearcher should 
automatically be closed
+     * 
+     * @return
+     * @throws IOException
+     */
+    <T> Set<T> withEachIndex(IndexAction<T> action, boolean autoClose) throws 
IOException;
+    
+    /**
+     * Returns an Iterator of EventIndexSearchers.
+     * @return
+     */
+    Set<EventIndexSearcher> getSearchers() throws IOException;
+    
+    /**
      * Performs the given action against each index, waiting for the action to 
complete
      * against each index before returning
      * 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
index bf2a495..1f53d27 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 import org.apache.nifi.provenance.journaling.toc.TocJournalReader;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
 public class LuceneIndexManager implements IndexManager {
     private static final Logger logger = 
LoggerFactory.getLogger(LuceneIndexManager.class);
     
+    private final ProvenanceEventRepository repo;
     private final JournalingRepositoryConfig config;
     private final ExecutorService queryExecutor;
     
@@ -50,7 +52,8 @@ public class LuceneIndexManager implements IndexManager {
     private final Map<String, AtomicLong> writerIndexes = new HashMap<>();
     private final ConcurrentMap<String, IndexSize> indexSizes = new 
ConcurrentHashMap<>();
     
-    public LuceneIndexManager(final JournalingRepositoryConfig config, final 
ScheduledExecutorService workerExecutor, final ExecutorService queryExecutor) 
throws IOException {
+    public LuceneIndexManager(final ProvenanceEventRepository repo, final 
JournalingRepositoryConfig config, final ScheduledExecutorService 
workerExecutor, final ExecutorService queryExecutor) throws IOException {
+        this.repo = repo;
         this.config = config;
         this.queryExecutor = queryExecutor;
         
@@ -66,7 +69,7 @@ public class LuceneIndexManager implements IndexManager {
                 
                 for ( int i=0; i < config.getIndexesPerContainer(); i++ ){
                     final File indexDir = new File(container, "indices/" + i);
-                    writerList.add(new LuceneIndexWriter(indexDir, config));
+                    writerList.add(new LuceneIndexWriter(repo, indexDir, 
config));
                 }
                 
                 workerExecutor.scheduleWithFixedDelay(new Runnable() {
@@ -99,7 +102,7 @@ public class LuceneIndexManager implements IndexManager {
             if (config.isReadOnly()) {
                 for (int i=0; i < config.getIndexesPerContainer(); i++) {
                     final File indexDir = new File(containerName, "indices/" + 
i);
-                    searchers.add(new LuceneIndexSearcher(indexDir));
+                    searchers.add(new LuceneIndexSearcher(repo, indexDir));
                 }
             } else {
                 final List<LuceneIndexWriter> writerList = 
writers.get(containerName);
@@ -196,8 +199,39 @@ public class LuceneIndexManager implements IndexManager {
         }        
     }
     
+    
+    @Override
+    public Set<EventIndexSearcher> getSearchers() throws IOException {
+        final Set<EventIndexSearcher> searchers = new HashSet<>();
+        
+        try {
+            final Set<String> containerNames = config.getContainers().keySet();
+            for (final String containerName : containerNames) {
+                final EventIndexSearcher searcher = 
newIndexSearcher(containerName);
+                searchers.add(searcher);
+            }
+            
+            return searchers;
+        } catch (final Exception e) {
+            for ( final EventIndexSearcher searcher : searchers ) {
+                try {
+                    searcher.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+            
+            throw e;
+        }
+    }
+    
     @Override
     public <T> Set<T> withEachIndex(final IndexAction<T> action) throws 
IOException {
+        return withEachIndex(action, true);
+    }
+    
+    @Override
+    public <T> Set<T> withEachIndex(final IndexAction<T> action, final boolean 
autoClose) throws IOException {
         final Set<T> results = new HashSet<>();
         final Map<String, Future<T>> futures = new HashMap<>();
         final Set<String> containerNames = config.getContainers().keySet();
@@ -205,8 +239,13 @@ public class LuceneIndexManager implements IndexManager {
             final Callable<T> callable = new Callable<T>() {
                 @Override
                 public T call() throws Exception {
-                    try (final EventIndexSearcher searcher = 
newIndexSearcher(containerName)) {
+                    final EventIndexSearcher searcher = 
newIndexSearcher(containerName);
+                    try {
                         return action.perform(searcher);
+                    } finally {
+                        if ( autoClose ) {
+                            searcher.close();
+                        }
                     }
                 }
             };

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
index 94bd3f8..cd57991 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
@@ -20,7 +20,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
@@ -38,25 +40,38 @@ import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.search.TopFieldDocs;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.pql.LuceneTranslator;
+import org.apache.nifi.pql.ProvenanceQuery;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent;
+import org.apache.nifi.provenance.journaling.exception.EventNotFoundException;
 import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.util.ObjectHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LuceneIndexSearcher implements EventIndexSearcher {
+    private static final Logger logger = 
LoggerFactory.getLogger(LuceneIndexSearcher.class);
+    
+    private final ProvenanceEventRepository repo;
     private final DirectoryReader reader;
     private final IndexSearcher searcher;
     private final FSDirectory fsDirectory;
     
     private final String description;
     
-    public LuceneIndexSearcher(final File indexDirectory) throws IOException {
+    public LuceneIndexSearcher(final ProvenanceEventRepository repo, final 
File indexDirectory) throws IOException {
+        this.repo = repo;
         this.fsDirectory = FSDirectory.open(indexDirectory);
         this.reader = DirectoryReader.open(fsDirectory);
         this.searcher = new IndexSearcher(reader);
         this.description = "LuceneIndexSearcher[indexDirectory=" + 
indexDirectory + "]";
     }
     
-    public LuceneIndexSearcher(final DirectoryReader reader, final File 
indexDirectory) {
+    public LuceneIndexSearcher(final ProvenanceEventRepository repo, final 
DirectoryReader reader, final File indexDirectory) {
+        this.repo = repo;
         this.reader = reader;
         this.searcher = new IndexSearcher(reader);
         this.fsDirectory = null;
@@ -80,16 +95,7 @@ public class LuceneIndexSearcher implements 
EventIndexSearcher {
             throw suppressed;
         }
     }
-    
-    private JournaledStorageLocation createLocation(final Document document) {
-        final String containerName = 
document.get(IndexedFieldNames.CONTAINER_NAME);
-        final String sectionName = 
document.get(IndexedFieldNames.SECTION_NAME);
-        final long journalId = 
document.getField(IndexedFieldNames.JOURNAL_ID).numericValue().longValue();
-        final int blockIndex = 
document.getField(IndexedFieldNames.BLOCK_INDEX).numericValue().intValue();
-        final long eventId = 
document.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue();
-        
-        return new JournaledStorageLocation(containerName, sectionName, 
journalId, blockIndex, eventId);
-    }
+
     
     private List<JournaledStorageLocation> getOrderedLocations(final TopDocs 
topDocs) throws IOException {
         final ScoreDoc[] scoreDocs = topDocs.scoreDocs;
@@ -103,7 +109,7 @@ public class LuceneIndexSearcher implements 
EventIndexSearcher {
     private void populateLocations(final TopDocs topDocs, final 
Collection<JournaledStorageLocation> locations) throws IOException {
         for ( final ScoreDoc scoreDoc : topDocs.scoreDocs ) {
             final Document document = reader.document(scoreDoc.doc);
-            locations.add(createLocation(document));
+            locations.add(QueryUtils.createLocation(document));
         }
     }
     
@@ -188,4 +194,100 @@ public class LuceneIndexSearcher implements 
EventIndexSearcher {
     public long getNumberOfEvents() {
         return reader.numDocs();
     }
+    
+    
+    private <T> Iterator<T> select(final String query, final 
DocumentTransformer<T> transformer) throws IOException {
+        final org.apache.lucene.search.Query luceneQuery = 
LuceneTranslator.toLuceneQuery(ProvenanceQuery.compile(query).getWhereClause());
+        final int batchSize = 1000;
+        
+        final ObjectHolder<TopDocs> topDocsHolder = new ObjectHolder<>(null);
+        return new Iterator<T>() {
+            int fetched = 0;
+            int scoreDocIndex = 0;
+            
+            @Override
+            public boolean hasNext() {
+                if ( topDocsHolder.get() == null ) {
+                    try {
+                        topDocsHolder.set(searcher.search(luceneQuery, 
batchSize));
+                    } catch (final IOException ioe) {
+                        throw new EventNotFoundException("Unable to obtain 
next record from " + LuceneIndexSearcher.this, ioe);
+                    }
+                }
+                
+                final boolean hasNext = fetched < 
topDocsHolder.get().totalHits;
+                if ( !hasNext ) {
+                    try {
+                        LuceneIndexSearcher.this.close();
+                    } catch (final IOException ioe) {
+                        logger.warn("Failed to close {} due to {}", this, 
ioe.toString());
+                        if ( logger.isDebugEnabled() ) {
+                            logger.warn("", ioe);
+                        }
+                    }
+                }
+                return hasNext;
+            }
+
+            @Override
+            public T next() {
+                if ( !hasNext() ) {
+                    throw new NoSuchElementException();
+                }
+                
+                TopDocs topDocs = topDocsHolder.get();
+                ScoreDoc[] scoreDocs = topDocs.scoreDocs;
+                if ( scoreDocIndex >= scoreDocs.length ) {
+                    try {
+                        topDocs = 
searcher.searchAfter(scoreDocs[scoreDocs.length - 1], luceneQuery, batchSize);
+                        topDocsHolder.set(topDocs);
+                        scoreDocs = topDocs.scoreDocs;
+                        scoreDocIndex = 0;
+                    } catch (final IOException ioe) {
+                        throw new EventNotFoundException("Unable to obtain 
next record from " + LuceneIndexSearcher.this, ioe);
+                    }
+                }
+                
+                final ScoreDoc scoreDoc = scoreDocs[scoreDocIndex++];
+                final Document document;
+                try {
+                    document = searcher.doc(scoreDoc.doc);
+                } catch (final IOException ioe) {
+                    throw new EventNotFoundException("Unable to obtain next 
record from " + LuceneIndexSearcher.this, ioe);
+                }
+                fetched++;
+                
+                return transformer.transform(document);
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    @Override
+    public Iterator<LazyInitializedProvenanceEvent> select(final String query) 
throws IOException {
+        return select(query, new 
DocumentTransformer<LazyInitializedProvenanceEvent>() {
+            @Override
+            public LazyInitializedProvenanceEvent transform(final Document 
document) {
+                return new LazyInitializedProvenanceEvent(repo, 
QueryUtils.createLocation(document), document);
+            }
+        });
+    }
+
+    @Override
+    public Iterator<JournaledStorageLocation> selectLocations(final String 
query) throws IOException {
+        return select(query, new 
DocumentTransformer<JournaledStorageLocation>() {
+            @Override
+            public JournaledStorageLocation transform(final Document document) 
{
+                return QueryUtils.createLocation(document);
+            }
+        });
+    }
+    
+    private static interface DocumentTransformer<T> {
+        T transform(Document document);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
index d1b2c0e..1f84891 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
@@ -50,6 +50,7 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.util.Version;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
@@ -60,22 +61,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class LuceneIndexWriter implements EventIndexWriter {
+    private static final Store STORE_FIELDS = Store.YES;
     private static final Logger logger = 
LoggerFactory.getLogger(LuceneIndexWriter.class);
     
-    @SuppressWarnings("unused")
-    private final JournalingRepositoryConfig config;
     private final Set<SearchableField> nonAttributeSearchableFields;
     private final Set<SearchableField> attributeSearchableFields;
     private final File indexDir;
     
+    private final ProvenanceEventRepository repo;
     private final Directory directory;
     private final Analyzer analyzer;
     private final IndexWriter indexWriter;
     private final AtomicLong indexMaxId = new AtomicLong(-1L);
     
-    public LuceneIndexWriter(final File indexDir, final 
JournalingRepositoryConfig config) throws IOException {
+    public LuceneIndexWriter(final ProvenanceEventRepository repo, final File 
indexDir, final JournalingRepositoryConfig config) throws IOException {
+        this.repo = repo;
         this.indexDir = indexDir;
-        this.config = config;
         
         attributeSearchableFields = Collections.unmodifiableSet(new 
HashSet<>(config.getSearchableAttributes()));
         nonAttributeSearchableFields = Collections.unmodifiableSet(new 
HashSet<>(config.getSearchableFields()));
@@ -96,7 +97,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
         logger.trace("Creating index searcher for {}", indexWriter);
         
         final DirectoryReader reader = DirectoryReader.open(indexWriter, 
false);
-        return new LuceneIndexSearcher(reader, indexDir);
+        return new LuceneIndexSearcher(repo, reader, indexDir);
     }
 
     @Override
@@ -143,30 +144,30 @@ public class LuceneIndexWriter implements 
EventIndexWriter {
             final Map<String, String> attributes = event.getAttributes();
 
             final Document doc = new Document();
-            addField(doc, SearchableFields.FlowFileUUID, 
event.getFlowFileUuid(), Store.NO);
-            addField(doc, SearchableFields.Filename, 
attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
-            addField(doc, SearchableFields.ComponentID, 
event.getComponentId(), Store.NO);
-            addField(doc, SearchableFields.AlternateIdentifierURI, 
event.getAlternateIdentifierUri(), Store.NO);
-            addField(doc, SearchableFields.EventType, 
event.getEventType().name(), Store.NO);
-            addField(doc, SearchableFields.Relationship, 
event.getRelationship(), Store.NO);
-            addField(doc, SearchableFields.Details, event.getDetails(), 
Store.NO);
-            addField(doc, SearchableFields.ContentClaimSection, 
event.getContentClaimSection(), Store.NO);
-            addField(doc, SearchableFields.ContentClaimContainer, 
event.getContentClaimContainer(), Store.NO);
-            addField(doc, SearchableFields.ContentClaimIdentifier, 
event.getContentClaimIdentifier(), Store.NO);
-            addField(doc, SearchableFields.SourceQueueIdentifier, 
event.getSourceQueueIdentifier(), Store.NO);
+            addField(doc, SearchableFields.FlowFileUUID, 
event.getFlowFileUuid(), STORE_FIELDS);
+            addField(doc, SearchableFields.Filename, 
attributes.get(CoreAttributes.FILENAME.key()), STORE_FIELDS);
+            addField(doc, SearchableFields.ComponentID, 
event.getComponentId(), STORE_FIELDS);
+            addField(doc, SearchableFields.AlternateIdentifierURI, 
event.getAlternateIdentifierUri(), STORE_FIELDS);
+            addField(doc, SearchableFields.EventType, 
event.getEventType().name(), STORE_FIELDS);
+            addField(doc, SearchableFields.Relationship, 
event.getRelationship(), STORE_FIELDS);
+            addField(doc, SearchableFields.Details, event.getDetails(), 
STORE_FIELDS);
+            addField(doc, SearchableFields.ContentClaimSection, 
event.getContentClaimSection(), STORE_FIELDS);
+            addField(doc, SearchableFields.ContentClaimContainer, 
event.getContentClaimContainer(), STORE_FIELDS);
+            addField(doc, SearchableFields.ContentClaimIdentifier, 
event.getContentClaimIdentifier(), STORE_FIELDS);
+            addField(doc, SearchableFields.SourceQueueIdentifier, 
event.getSourceQueueIdentifier(), STORE_FIELDS);
 
             if 
(nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
-                addField(doc, SearchableFields.TransitURI, 
event.getTransitUri(), Store.NO);
+                addField(doc, SearchableFields.TransitURI, 
event.getTransitUri(), STORE_FIELDS);
             }
 
             for (final SearchableField searchableField : 
attributeSearchableFields) {
-                addField(doc, searchableField, 
attributes.get(searchableField.getSearchableFieldName()), Store.NO);
+                addField(doc, searchableField, 
attributes.get(searchableField.getSearchableFieldName()), STORE_FIELDS);
             }
 
             // Index the fields that we always index (unless there's nothing 
else to index at all)
-            doc.add(new 
LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), 
event.getLineageStartDate(), Store.NO));
-            doc.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
event.getEventTime(), Store.NO));
-            doc.add(new 
LongField(SearchableFields.FileSize.getSearchableFieldName(), 
event.getFileSize(), Store.NO));
+            doc.add(new 
LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), 
event.getLineageStartDate(), STORE_FIELDS));
+            doc.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
event.getEventTime(), STORE_FIELDS));
+            doc.add(new 
LongField(SearchableFields.FileSize.getSearchableFieldName(), 
event.getFileSize(), STORE_FIELDS));
 
             final JournaledStorageLocation location = 
event.getStorageLocation();
             doc.add(new StringField(IndexedFieldNames.CONTAINER_NAME, 
location.getContainerName(), Store.YES));
@@ -176,20 +177,20 @@ public class LuceneIndexWriter implements 
EventIndexWriter {
             doc.add(new LongField(IndexedFieldNames.EVENT_ID, 
location.getEventId(), Store.YES));
 
             for (final String lineageIdentifier : 
event.getLineageIdentifiers()) {
-                addField(doc, SearchableFields.LineageIdentifier, 
lineageIdentifier, Store.NO);
+                addField(doc, SearchableFields.LineageIdentifier, 
lineageIdentifier, STORE_FIELDS);
             }
 
             // If it's event is a FORK, or JOIN, add the FlowFileUUID for all 
child/parent UUIDs.
             if (event.getEventType() == ProvenanceEventType.FORK || 
event.getEventType() == ProvenanceEventType.CLONE || event.getEventType() == 
ProvenanceEventType.REPLAY) {
                 for (final String uuid : event.getChildUuids()) {
                     if (!uuid.equals(event.getFlowFileUuid())) {
-                        addField(doc, SearchableFields.FlowFileUUID, uuid, 
Store.NO);
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, 
STORE_FIELDS);
                     }
                 }
             } else if (event.getEventType() == ProvenanceEventType.JOIN) {
                 for (final String uuid : event.getParentUuids()) {
                     if (!uuid.equals(event.getFlowFileUuid())) {
-                        addField(doc, SearchableFields.FlowFileUUID, uuid, 
Store.NO);
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, 
STORE_FIELDS);
                     }
                 }
             } else if (event.getEventType() == ProvenanceEventType.RECEIVE && 
event.getSourceSystemFlowFileIdentifier() != null) {
@@ -205,7 +206,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
                 }
 
                 if (sourceFlowFileUUID != null) {
-                    addField(doc, SearchableFields.FlowFileUUID, 
sourceFlowFileUUID, Store.NO);
+                    addField(doc, SearchableFields.FlowFileUUID, 
sourceFlowFileUUID, STORE_FIELDS);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
index 4accf50..b555407 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
@@ -21,11 +21,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent;
 import org.apache.nifi.provenance.search.Query;
 
+import com.google.common.collect.Iterators;
+
 public class MultiIndexSearcher implements EventIndexSearcher {
     private final List<EventIndexSearcher> searchers;
     
@@ -160,4 +164,28 @@ public class MultiIndexSearcher implements 
EventIndexSearcher {
     public String toString() {
         return searchers.toString();
     }
+
+    @Override
+    public Iterator<LazyInitializedProvenanceEvent> select(final String query) 
throws IOException {
+        final List<Iterator<LazyInitializedProvenanceEvent>> iterators = new 
ArrayList<>(searchers.size());
+        
+        for ( final EventIndexSearcher searcher : searchers ) {
+            final Iterator<LazyInitializedProvenanceEvent> itr = 
searcher.select(query);
+            iterators.add(itr);
+        }
+
+        return Iterators.concat(iterators.iterator());
+    }
+
+    @Override
+    public Iterator<JournaledStorageLocation> selectLocations(final String 
query) throws IOException {
+        final List<Iterator<JournaledStorageLocation>> iterators = new 
ArrayList<>(searchers.size());
+        
+        for ( final EventIndexSearcher searcher : searchers ) {
+            final Iterator<JournaledStorageLocation> itr = 
searcher.selectLocations(query);
+            iterators.add(itr);
+        }
+
+        return Iterators.concat(iterators.iterator());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
index d8dd5eb..aa516ab 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.lucene.document.Document;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanQuery;
@@ -41,6 +42,18 @@ import 
org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 import org.apache.nifi.provenance.search.SearchTerm;
 
 public class QueryUtils {
+    
+    public static JournaledStorageLocation createLocation(final Document 
document) {
+        final String containerName = 
document.get(IndexedFieldNames.CONTAINER_NAME);
+        final String sectionName = 
document.get(IndexedFieldNames.SECTION_NAME);
+        final long journalId = 
document.getField(IndexedFieldNames.JOURNAL_ID).numericValue().longValue();
+        final int blockIndex = 
document.getField(IndexedFieldNames.BLOCK_INDEX).numericValue().intValue();
+        final long eventId = 
document.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue();
+        
+        return new JournaledStorageLocation(containerName, sectionName, 
journalId, blockIndex, eventId);
+    }
+    
+    
     public static org.apache.lucene.search.Query convertQueryToLucene(final 
org.apache.nifi.provenance.search.Query query) {
         if (query.getStartDate() == null && query.getEndDate() == null && 
query.getSearchTerms().isEmpty()) {
             return new MatchAllDocsQuery();

Reply via email to