http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index b60d187,01ad941..c1100c8
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@@ -51,23 -52,30 +52,35 @@@ import org.apache.nifi.connectable.Posi
  import org.apache.nifi.connectable.Size;
  import org.apache.nifi.controller.exception.ProcessorInstantiationException;
  import org.apache.nifi.controller.label.Label;
+ import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
++import 
org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+ import org.apache.nifi.controller.service.ControllerServiceLoader;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceState;
+ import org.apache.nifi.encrypt.StringEncryptor;
  import org.apache.nifi.events.BulletinFactory;
- import org.apache.nifi.util.file.FileUtils;
  import org.apache.nifi.fingerprint.FingerprintException;
  import org.apache.nifi.fingerprint.FingerprintFactory;
  import org.apache.nifi.flowfile.FlowFilePrioritizer;
  import org.apache.nifi.groups.ProcessGroup;
  import org.apache.nifi.groups.RemoteProcessGroup;
  import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
++import org.apache.nifi.logging.ComponentLog;
  import org.apache.nifi.logging.LogLevel;
  import org.apache.nifi.processor.Relationship;
++import org.apache.nifi.processor.SimpleProcessLogger;
  import org.apache.nifi.remote.RemoteGroupPort;
  import org.apache.nifi.remote.RootGroupPort;
++import org.apache.nifi.reporting.InitializationException;
++import org.apache.nifi.reporting.ReportingInitializationContext;
  import org.apache.nifi.reporting.Severity;
  import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.util.DomUtils;
  import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.file.FileUtils;
  import org.apache.nifi.web.api.dto.ConnectableDTO;
  import org.apache.nifi.web.api.dto.ConnectionDTO;
+ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
  import org.apache.nifi.web.api.dto.FlowSnippetDTO;
  import org.apache.nifi.web.api.dto.FunnelDTO;
  import org.apache.nifi.web.api.dto.LabelDTO;
@@@ -313,7 -346,114 +351,124 @@@ public class StandardFlowSynchronizer i
  
          return baos.toByteArray();
      }
+     
+     
+     private void updateControllerService(final FlowController controller, 
final Element controllerServiceElement, final StringEncryptor encryptor) {
+       final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+       
+       final ControllerServiceState state = 
ControllerServiceState.valueOf(dto.getState());
+         final boolean dtoEnabled = (state == ControllerServiceState.ENABLED 
|| state == ControllerServiceState.ENABLING);
+         
+         final ControllerServiceNode serviceNode = 
controller.getControllerServiceNode(dto.getId());
+         final ControllerServiceState serviceState = serviceNode.getState();
+         final boolean serviceEnabled = (serviceState == 
ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
+         
+       if (dtoEnabled && !serviceEnabled) {
+               
controller.enableControllerService(controller.getControllerServiceNode(dto.getId()));
+       } else if (!dtoEnabled && serviceEnabled) {
+               
controller.disableControllerService(controller.getControllerServiceNode(dto.getId()));
+       }
+     }
+     
+     private void addReportingTask(final FlowController controller, final 
Element reportingTaskElement, final StringEncryptor encryptor) throws 
ReportingTaskInstantiationException {
+       final ReportingTaskDTO dto = 
FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
+       
+       final ReportingTaskNode reportingTask = 
controller.createReportingTask(dto.getType(), dto.getId(), false);
+       reportingTask.setName(dto.getName());
+       reportingTask.setComments(dto.getComment());
+       reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
+       
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
+       
+       reportingTask.setAnnotationData(dto.getAnnotationData());
+       
+         for (final Map.Entry<String, String> entry : 
dto.getProperties().entrySet()) {
+             if (entry.getValue() == null) {
+               reportingTask.removeProperty(entry.getKey());
+             } else {
+               reportingTask.setProperty(entry.getKey(), entry.getValue());
+             }
+         }
+         
++        final ComponentLog componentLog = new 
SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
++        final ReportingInitializationContext config = new 
StandardReportingInitializationContext(dto.getId(), dto.getName(), 
++                SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), 
dto.getSchedulingPeriod(), componentLog, controller);
++        
++        try {
++            reportingTask.getReportingTask().initialize(config);
++        } catch (final InitializationException ie) {
++            throw new ReportingTaskInstantiationException("Failed to 
initialize reporting task of type " + dto.getType(), ie);
++        }
++        
+         if ( autoResumeState ) {
+               if ( ScheduledState.RUNNING.name().equals(dto.getState()) ) {
+                       try {
+                               controller.startReportingTask(reportingTask);
+                       } catch (final Exception e) {
+                               logger.error("Failed to start {} due to {}", 
reportingTask, e);
+                               if ( logger.isDebugEnabled() ) {
+                                       logger.error("", e);
+                               }
+                               
controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+                                               "Reporting Tasks", 
Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
+                       }
+               } else if ( 
ScheduledState.DISABLED.name().equals(dto.getState()) ) {
+                       try {
+                               controller.disableReportingTask(reportingTask);
+                       } catch (final Exception e) {
+                               logger.error("Failed to mark {} as disabled due 
to {}", reportingTask, e);
+                               if ( logger.isDebugEnabled() ) {
+                                       logger.error("", e);
+                               }
+                               
controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+                                               "Reporting Tasks", 
Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to 
" + e));
+                       }
+               }
+         }
+     }
  
+     private void updateReportingTask(final FlowController controller, final 
Element reportingTaskElement, final StringEncryptor encryptor) {
+       final ReportingTaskDTO dto = 
FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
+       final ReportingTaskNode taskNode = 
controller.getReportingTaskNode(dto.getId());
+       
+         if (!taskNode.getScheduledState().name().equals(dto.getState())) {
+             try {
+                 switch (ScheduledState.valueOf(dto.getState())) {
+                     case DISABLED:
+                       if ( taskNode.isRunning() ) {
+                               controller.stopReportingTask(taskNode);
+                       }
+                       controller.disableReportingTask(taskNode);
+                         break;
+                     case RUNNING:
+                       if ( taskNode.getScheduledState() == 
ScheduledState.DISABLED ) {
+                               controller.enableReportingTask(taskNode);
+                       }
+                       controller.startReportingTask(taskNode);
+                         break;
+                     case STOPPED:
+                         if (taskNode.getScheduledState() == 
ScheduledState.DISABLED) {
+                               controller.enableReportingTask(taskNode);
+                         } else if (taskNode.getScheduledState() == 
ScheduledState.RUNNING) {
+                               controller.stopReportingTask(taskNode);
+                         }
+                         break;
+                 }
+             } catch (final IllegalStateException ise) {
+                 logger.error("Failed to change Scheduled State of {} from {} 
to {} due to {}", taskNode, taskNode.getScheduledState().name(), 
dto.getState(), ise.toString());
+                 logger.error("", ise);
+ 
+                 // create bulletin for the Processor Node
+                 
controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node
 Reconnection", Severity.ERROR.name(),
+                         "Failed to change Scheduled State of " + taskNode + " 
from " + taskNode.getScheduledState().name() + " to " + dto.getState() + " due 
to " + ise.toString()));
+ 
+                 // create bulletin at Controller level.
+                 
controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node
 Reconnection", Severity.ERROR.name(),
+                         "Failed to change Scheduled State of " + taskNode + " 
from " + taskNode.getScheduledState().name() + " to " + dto.getState() + " due 
to " + ise.toString()));
+             }
+         }
+     }
+     
+     
      private ProcessGroup updateProcessGroup(final FlowController controller, 
final ProcessGroup parentGroup, final Element processGroupElement, final 
StringEncryptor encryptor) throws ProcessorInstantiationException {
  
          // get the parent group ID

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 7c3734a,272c0ba..6b3cc95
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@@ -44,9 -48,9 +48,9 @@@ public abstract class AbstractReporting
      private final ControllerServiceLookup serviceLookup;
  
      private final AtomicReference<SchedulingStrategy> schedulingStrategy = 
new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN);
--    private final AtomicReference<String> schedulingPeriod = new 
AtomicReference<>("5 mins");
-     private final AtomicReference<Availability> availability = new 
AtomicReference<>(Availability.NODE_ONLY);
- 
++    private final AtomicReference<String> schedulingPeriod = new 
AtomicReference<>("1 mins");
+     
+     private volatile String comment;
      private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
      
      public AbstractReportingTaskNode(final ReportingTask reportingTask, final 
String id,

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 1627994,087ec68..7347632
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@@ -177,16 -182,11 +182,11 @@@ public final class StandardProcessSched
                          }
                          
                          break;
-                     } catch (final InvocationTargetException ite) {
-                         LOG.error("Failed to invoke the On-Scheduled 
Lifecycle methods of {} due to {}; administratively yielding this ReportingTask 
and will attempt to schedule it again after {}",
-                                 new Object[]{reportingTask, 
ite.getTargetException(), administrativeYieldDuration});
-                         LOG.error("", ite.getTargetException());
- 
-                         try {
-                             Thread.sleep(administrativeYieldMillis);
-                         } catch (final InterruptedException ie) {
-                         }
                      } catch (final Exception e) {
+                         final Throwable cause = (e instanceof 
InvocationTargetException) ? e.getCause() : e;
 -                        final ComponentLog componentLog = new 
SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
++                        final ComponentLog componentLog = new 
SimpleProcessLogger(taskNode.getIdentifier(), reportingTask);
+                         componentLog.error("Failed to invoke @OnEnabled 
method due to {}", cause);
+                         
                          LOG.error("Failed to invoke the On-Scheduled 
Lifecycle methods of {} due to {}; administratively yielding this ReportingTask 
and will attempt to schedule it again after {}",
                                  new Object[]{reportingTask, e.toString(), 
administrativeYieldDuration}, e);
                          try {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
index fc197e8,0000000..12f2911
mode 100644,000000..100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@@ -1,744 -1,0 +1,748 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.provenance.journaling;
 +
 +import java.io.File;
 +import java.io.FilenameFilter;
 +import java.io.IOException;
 +import java.nio.file.Path;
 +import java.nio.file.Paths;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ThreadFactory;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.regex.Pattern;
 +
 +import org.apache.nifi.events.EventReporter;
 +import org.apache.nifi.pql.ProvenanceQuery;
 +import org.apache.nifi.processor.DataUnit;
 +import org.apache.nifi.provenance.ProvenanceEventBuilder;
 +import org.apache.nifi.provenance.ProvenanceEventRecord;
 +import org.apache.nifi.provenance.ProvenanceEventRepository;
 +import org.apache.nifi.provenance.SearchableFieldParser;
 +import org.apache.nifi.provenance.SearchableFields;
 +import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 +import org.apache.nifi.provenance.StorageLocation;
 +import org.apache.nifi.provenance.StoredProvenanceEvent;
 +import 
org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException;
 +import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
 +import org.apache.nifi.provenance.journaling.index.IndexAction;
 +import org.apache.nifi.provenance.journaling.index.IndexManager;
 +import org.apache.nifi.provenance.journaling.index.LuceneIndexManager;
 +import org.apache.nifi.provenance.journaling.index.QueryUtils;
 +import org.apache.nifi.provenance.journaling.journals.JournalReader;
 +import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
 +import org.apache.nifi.provenance.journaling.partition.Partition;
 +import org.apache.nifi.provenance.journaling.partition.PartitionAction;
 +import org.apache.nifi.provenance.journaling.partition.PartitionManager;
 +import 
org.apache.nifi.provenance.journaling.partition.QueuingPartitionManager;
 +import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction;
 +import org.apache.nifi.provenance.journaling.query.QueryManager;
 +import org.apache.nifi.provenance.journaling.query.StandardQueryManager;
 +import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
 +import org.apache.nifi.provenance.journaling.toc.TocReader;
 +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
 +import org.apache.nifi.provenance.query.ProvenanceQueryResult;
 +import org.apache.nifi.provenance.query.ProvenanceQuerySubmission;
 +import org.apache.nifi.provenance.query.ProvenanceResultSet;
 +import org.apache.nifi.provenance.search.Query;
 +import org.apache.nifi.provenance.search.QuerySubmission;
 +import org.apache.nifi.provenance.search.SearchableField;
 +import org.apache.nifi.reporting.Severity;
 +import org.apache.nifi.util.FormatUtils;
 +import org.apache.nifi.util.NiFiProperties;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +// TODO: read-only is not checked everywhere!
 +public class JournalingProvenanceRepository implements 
ProvenanceEventRepository {
 +    public static final String WORKER_THREAD_POOL_SIZE = 
"nifi.provenance.repository.worker.threads";
 +    public static final String BLOCK_SIZE = 
"nifi.provenance.repository.writer.block.size";
 +    
 +    private static final Logger logger = 
LoggerFactory.getLogger(JournalingProvenanceRepository.class);
 +    
 +    private final JournalingRepositoryConfig config;
 +    private final AtomicLong idGenerator = new AtomicLong(0L);
 +    
 +    // the follow member variables are effectively final. They are initialized
 +    // in the initialize method rather than the constructor because we want 
to ensure
 +    // that they are not created every time that the Java Service Loader 
instantiates the class.
 +    private ScheduledExecutorService workerExecutor;
 +    private ExecutorService queryExecutor;
 +    private ExecutorService compressionExecutor;
 +    private EventReporter eventReporter;
 +    private PartitionManager partitionManager;
 +    private QueryManager queryManager;
 +    private IndexManager indexManager;
 +    
 +    public JournalingProvenanceRepository() throws IOException {
 +        this(createConfig());
 +    }
 +    
 +    public JournalingProvenanceRepository(final JournalingRepositoryConfig 
config) throws IOException {
 +        this.config = config;
 +    }
 +
 +    private static ThreadFactory createThreadFactory(final String namePrefix) 
{
 +        final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
 +        final AtomicInteger counter = new AtomicInteger(0);
 +        
 +        return new ThreadFactory() {
 +            @Override
 +            public Thread newThread(final Runnable r) {
 +                final Thread thread = defaultFactory.newThread(r);
 +                thread.setName(namePrefix + "-" + counter.incrementAndGet());
 +                return thread;
 +            }
 +        };
 +    }
 +    
 +    private static JournalingRepositoryConfig createConfig()  {
 +        final NiFiProperties properties = NiFiProperties.getInstance();
 +        final Map<String, Path> storageDirectories = 
properties.getProvenanceRepositoryPaths();
 +        if (storageDirectories.isEmpty()) {
 +            storageDirectories.put("provenance_repository", 
Paths.get("provenance_repository"));
 +        }
 +        final String storageTime = 
properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours");
 +        final String storageSize = 
properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
 +        final String rolloverTime = 
properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
 +        final String rolloverSize = 
properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
 +        final String shardSize = 
properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
 +        final int queryThreads = 
properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 
2);
 +        final int workerThreads = 
properties.getIntegerProperty(WORKER_THREAD_POOL_SIZE, 4);
 +        final int journalCount = 
properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
 +
 +        final long storageMillis = FormatUtils.getTimeDuration(storageTime, 
TimeUnit.MILLISECONDS);
 +        final long maxStorageBytes = DataUnit.parseDataSize(storageSize, 
DataUnit.B).longValue();
 +        final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, 
TimeUnit.MILLISECONDS);
 +        final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, 
DataUnit.B).longValue();
 +
 +        final boolean compressOnRollover = 
Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
 +        final String indexedFieldString = 
properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
 +        final String indexedAttrString = 
properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
 +        final int blockSize = properties.getIntegerProperty(BLOCK_SIZE, 5000);
 +        
 +        final Boolean alwaysSync = 
Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync",
 "false"));
 +
 +        final List<SearchableField> searchableFields = 
SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
 +        final List<SearchableField> searchableAttributes = 
SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
 +
 +        // We always want to index the Event Time.
 +        if (!searchableFields.contains(SearchableFields.EventTime)) {
 +            searchableFields.add(SearchableFields.EventTime);
 +        }
 +
 +        final JournalingRepositoryConfig config = new 
JournalingRepositoryConfig();
 +        
 +        final Map<String, File> containers = new 
HashMap<>(storageDirectories.size());
 +        for ( final Map.Entry<String, Path> entry : 
storageDirectories.entrySet() ) {
 +            containers.put(entry.getKey(), entry.getValue().toFile());
 +        }
 +        config.setContainers(containers);
 +        config.setCompressOnRollover(compressOnRollover);
 +        config.setSearchableFields(searchableFields);
 +        config.setSearchableAttributes(searchableAttributes);
 +        config.setJournalCapacity(rolloverBytes);
 +        config.setJournalRolloverPeriod(rolloverMillis, 
TimeUnit.MILLISECONDS);
 +        config.setEventExpiration(storageMillis, TimeUnit.MILLISECONDS);
 +        config.setMaxStorageCapacity(maxStorageBytes);
 +        config.setQueryThreadPoolSize(queryThreads);
 +        config.setWorkerThreadPoolSize(workerThreads);
 +        config.setPartitionCount(journalCount);
 +        config.setBlockSize(blockSize);
 +        
 +        if (shardSize != null) {
 +            config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, 
DataUnit.B).longValue());
 +        }
 +
 +        config.setAlwaysSync(alwaysSync);
 +
 +        return config;
 +    }
 +    
 +    @Override
 +    public synchronized void initialize(final EventReporter eventReporter) 
throws IOException {
 +        this.eventReporter = eventReporter;
 +        
 +        // Ensure that the number of partitions specified by the config is at 
least as large as the 
 +        // number of sections that we have. If not, update the config to be 
equal to the number of
 +        // sections that we have.
 +        final Pattern numberPattern = Pattern.compile("\\d+");
 +        int numSections = 0;
 +        for ( final File container : config.getContainers().values() ) {
 +            final String[] sections = container.list(new FilenameFilter() {
 +                @Override
 +                public boolean accept(final File dir, final String name) {
 +                    return numberPattern.matcher(name).matches();
 +                }
 +            });
 +            
 +            if ( sections != null ) {
 +                numSections += sections.length;
 +            }
 +        }
 +        
 +        if ( config.getPartitionCount() < numSections ) {
 +            logger.warn("Configured number of partitions for Provenance 
Repository is {}, but {} partitions already exist. Using {} partitions instead 
of {}.", 
 +                    config.getPartitionCount(), numSections, numSections, 
config.getPartitionCount());
 +            config.setPartitionCount(numSections);
 +        }
 +        
 +        // We use 3 different thread pools here because we don't want to 
threads from 1 pool to interfere with
 +        // each other. This is because the worker threads can be long 
running, and they shouldn't tie up the
 +        // compression threads. Likewise, there may be MANY compression 
tasks, which could delay the worker
 +        // threads. And the query threads need to run immediately when a user 
submits a query - they cannot
 +        // wait until we finish compressing data and sync'ing the repository!
 +        final int workerThreadPoolSize = Math.max(2, 
config.getWorkerThreadPoolSize());
 +        this.workerExecutor = 
Executors.newScheduledThreadPool(workerThreadPoolSize, 
createThreadFactory("Provenance Repository Worker Thread"));
 +        
 +        final int queryThreadPoolSize = Math.max(2, 
config.getQueryThreadPoolSize());
 +        this.queryExecutor = 
Executors.newScheduledThreadPool(queryThreadPoolSize, 
createThreadFactory("Provenance Repository Query Thread"));
 +        
 +        final int compressionThreads = Math.max(1, 
config.getCompressionThreadPoolSize());
 +        this.compressionExecutor = 
Executors.newFixedThreadPool(compressionThreads, 
createThreadFactory("Provenance Repository Compression Thread"));
 +        
 +        this.indexManager = new LuceneIndexManager(this, config, 
workerExecutor, queryExecutor);
 +        this.partitionManager = new QueuingPartitionManager(indexManager, 
idGenerator, config, workerExecutor, compressionExecutor);
 +        this.queryManager = new StandardQueryManager(indexManager, 
queryExecutor, config, 10);
 +        
 +        final Long maxEventId = getMaxEventId();
 +        if ( maxEventId != null && maxEventId > 0 ) {
 +            this.idGenerator.set(maxEventId);   // maxEventId returns 1 
greater than the last event id written
 +        }
 +        
 +        // the partition manager may have caused journals to be re-indexed. 
We will sync the
 +        // index manager to make sure that we are completely in sync before 
allowing any new data
 +        // to be written to the repo.
 +        indexManager.sync();
 +        
 +        final long expirationFrequencyNanos = 
config.getExpirationFrequency(TimeUnit.NANOSECONDS);
 +        workerExecutor.scheduleWithFixedDelay(new ExpireOldEvents(), 
expirationFrequencyNanos, expirationFrequencyNanos, TimeUnit.NANOSECONDS);
 +        
 +        workerExecutor.scheduleWithFixedDelay(new Runnable() {
 +            @Override
 +            public void run() {
 +                partitionManager.deleteEventsBasedOnSize();
 +            }
 +        }, expirationFrequencyNanos, expirationFrequencyNanos, 
TimeUnit.NANOSECONDS);
 +    }
 +
 +    @Override
 +    public ProvenanceEventBuilder eventBuilder() {
 +        return new StandardProvenanceEventRecord.Builder();
 +    }
 +
 +    @Override
 +    public void registerEvent(final ProvenanceEventRecord event) throws 
IOException {
 +        registerEvents(Collections.singleton(event));
 +    }
 +
 +    @Override
 +    public void registerEvents(final Collection<ProvenanceEventRecord> 
events) throws IOException {
 +        try {
 +            partitionManager.withPartition(new VoidPartitionAction() {
 +                @Override
 +                public void perform(final Partition partition) throws 
IOException {
 +                    partition.registerEvents(events, 
idGenerator.getAndAdd(events.size()));
 +                }
 +            }, true);
 +        } catch (final IOException ioe) {
 +            if ( eventReporter != null ) {
 +                eventReporter.reportEvent(Severity.ERROR, "Provenance 
Repository", "Failed to persist " + events.size() + " events to Provenance 
Repository due to " + ioe);
 +            }
 +            throw ioe;
 +        }
 +    }
 +
 +    @Override
 +    public StoredProvenanceEvent getEvent(final long id) throws IOException {
 +        final List<StoredProvenanceEvent> events = getEvents(id, 1);
 +        if ( events.isEmpty() ) {
 +            return null;
 +        }
 +
 +        // We have to check the id of the event returned, because we are 
requesting up to 1 record
 +        // starting with the given id. However, if that ID doesn't exist, we 
could get a record
 +        // with a larger id.
 +        final StoredProvenanceEvent event = events.get(0);
 +        if ( event.getEventId() == id ) {
 +            return event;
 +        }
 +        
 +        return null;
 +    }
 +    
 +    @Override
 +    public List<StoredProvenanceEvent> getEvents(final long firstRecordId, 
final int maxRecords) throws IOException {
 +        // Must generate query to determine the appropriate StorageLocation 
objects and then call
 +        // getEvent(List<StorageLocation>)
 +        final Set<List<JournaledStorageLocation>> resultSet = 
indexManager.withEachIndex(
 +            new IndexAction<List<JournaledStorageLocation>>() {
 +                @Override
 +                public List<JournaledStorageLocation> perform(final 
EventIndexSearcher searcher) throws IOException {
 +                    return searcher.getEvents(firstRecordId, maxRecords);
 +                }
 +        });
 +        
 +        final ArrayList<JournaledStorageLocation> locations = new 
ArrayList<>(maxRecords);
 +        for ( final List<JournaledStorageLocation> list : resultSet ) {
 +            for ( final JournaledStorageLocation location : list ) {
 +                locations.add(location);
 +            }
 +        }
 +        
 +        Collections.sort(locations, new 
Comparator<JournaledStorageLocation>() {
 +            @Override
 +            public int compare(final JournaledStorageLocation o1, final 
JournaledStorageLocation o2) {
 +                return Long.compare(o1.getEventId(), o2.getEventId());
 +            }
 +        });
 +        
 +        locations.trimToSize();
 +        
 +        @SuppressWarnings({ "rawtypes", "unchecked" })
 +        final List<StorageLocation> storageLocations = 
(List<StorageLocation>) ((List) locations);
 +        return getEvents(storageLocations);
 +    }
 +    
 +    @Override
 +    public StoredProvenanceEvent getEvent(final StorageLocation location) 
throws IOException {
 +        final List<StoredProvenanceEvent> storedEvents = 
getEvents(Collections.singletonList(location));
 +        return (storedEvents == null || storedEvents.isEmpty()) ? null : 
storedEvents.get(0);
 +    }
 +    
 +    
 +    
 +    @Override
 +    public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> 
locations) throws IOException {
 +        // Group the locations by journal files because we want a single 
thread, at most, per journal file.
 +        final Map<File, List<JournaledStorageLocation>> orderedLocations = 
QueryUtils.orderLocations(locations, config);
 +        
 +        // Go through each journal file and create a callable that can lookup 
the records for that journal file.
 +        final List<Future<List<StoredProvenanceEvent>>> futures = new 
ArrayList<>();
 +        for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : 
orderedLocations.entrySet() ) {
 +            final File journalFile = entry.getKey();
 +            final List<JournaledStorageLocation> locationsForFile = 
entry.getValue();
 +            
 +            final Callable<List<StoredProvenanceEvent>> callable = new 
Callable<List<StoredProvenanceEvent>>() {
 +                @Override
 +                public List<StoredProvenanceEvent> call() throws Exception {
 +                    final File tocFile = QueryUtils.getTocFile(journalFile);
 +                    if ( !journalFile.exists() || !tocFile.exists() ) {
 +                        return Collections.emptyList();
 +                    }
 +                    
 +                    try(final TocReader tocReader = new 
StandardTocReader(tocFile);
 +                        final JournalReader reader = new 
StandardJournalReader(journalFile)) 
 +                    {
 +                        final List<StoredProvenanceEvent> storedEvents = new 
ArrayList<>(locationsForFile.size());
 +                        
 +                        for ( final JournaledStorageLocation location : 
locationsForFile ) {
 +                            final long blockOffset = 
tocReader.getBlockOffset(location.getBlockIndex());
 +                            final ProvenanceEventRecord event = 
reader.getEvent(blockOffset, location.getEventId());
 +                            
 +                            storedEvents.add(new 
JournaledProvenanceEvent(event, location));
 +                        }
 +                        
 +                        return storedEvents;
 +                    }
 +                }
 +            };
 +            
 +            final Future<List<StoredProvenanceEvent>> future = 
queryExecutor.submit(callable);
 +            futures.add(future);
 +        }
 +        
 +        // Get all of the events from the futures, waiting for them to finish.
 +        final Map<StorageLocation, StoredProvenanceEvent> locationToEventMap 
= new HashMap<>(locations.size());
 +        for ( final Future<List<StoredProvenanceEvent>> future : futures ) {
 +            try {
 +                final List<StoredProvenanceEvent> events = future.get();
 +                
 +                // Map the location to the event, so that we can then 
re-order the events in the same order
 +                // that the locations were passed to us.
 +                for ( final StoredProvenanceEvent event : events ) {
 +                    locationToEventMap.put(event.getStorageLocation(), event);
 +                }
 +            } catch (final ExecutionException ee) {
 +                final Throwable cause = ee.getCause();
 +                if ( cause instanceof IOException ) {
 +                    throw (IOException) cause;
 +                } else {
 +                    throw new RuntimeException(cause);
 +                }
 +            } catch (final InterruptedException ie) {
 +                throw new RuntimeException(ie);
 +            }
 +        }
 +        
 +        // Sort Events by the order of the provided locations.
 +        final List<StoredProvenanceEvent> sortedEvents = new 
ArrayList<>(locations.size());
 +        for ( final StorageLocation location : locations ) {
 +            final StoredProvenanceEvent event = 
locationToEventMap.get(location);
 +            if ( event != null ) {
 +                sortedEvents.add(event);
 +            }
 +        }
 +        
 +        return sortedEvents;
 +    }
 +    
 +
 +    @Override
 +    public Long getMaxEventId() throws IOException {
 +        final Set<Long> maxIds = 
partitionManager.withEachPartitionSerially(new PartitionAction<Long>() {
 +            @Override
 +            public Long perform(final Partition partition) throws IOException 
{
 +                return partition.getMaxEventId();
 +            }
 +        }, false);
 +        
 +        Long maxId = null;
 +        for ( final Long id : maxIds ) {
 +            if ( id == null ) {
 +                continue;
 +            }
 +            
 +            if ( maxId == null || id > maxId ) {
 +                maxId = id;
 +            }
 +        }
 +        
 +        return maxId;
 +    }
 +
-     ProgressAwareIterator<? extends StoredProvenanceEvent> 
selectMatchingEvents(final String query, final AtomicLong lastTimeProgressMade) 
throws IOException {
++    ProgressAwareIterator<? extends StoredProvenanceEvent> 
selectMatchingEvents(final String query, final Set<String> referencedFields, 
final AtomicLong lastTimeProgressMade) throws IOException {
 +        final Set<EventIndexSearcher> searchers = indexManager.getSearchers();
 +        final Iterator<EventIndexSearcher> searchItr = searchers.iterator();
 +        
 +        return new ProgressAwareIterator<StoredProvenanceEvent>() {
 +            private Iterator<LazyInitializedProvenanceEvent> eventItr;
 +            private int searchersComplete = 0;
 +            private EventIndexSearcher currentSearcher;
 +            
 +            @Override
 +            public int getPercentComplete() {
 +                return searchers.isEmpty() ? 100 : searchersComplete / 
searchers.size() * 100;
 +            }
 +            
 +            @Override
 +            public boolean hasNext() {
 +                // while the event iterator has no information...
 +                while ( eventItr == null || !eventItr.hasNext() ) {
 +                    // if there's not another searcher then we're out of 
events.
 +                    if ( !searchItr.hasNext() ) {
 +                        return false;
 +                    }
 +                    
 +                    // we're finished with this searcher. Close it.
 +                    if ( currentSearcher != null ) {
 +                        try {
 +                            currentSearcher.close();
 +                        } catch (final IOException ioe) {
 +                            logger.warn("Failed to close {} due to {}", 
currentSearcher, ioe.toString());
 +                            if ( logger.isDebugEnabled() ) {
 +                                logger.warn("", ioe);
 +                            }
 +                        }
 +                    }
 +                    
 +                    // We have a searcher. get events from it. If there are 
no matches,
 +                    // then our while loop will keep going.
 +                    currentSearcher = searchItr.next();
 +                    searchersComplete++;
 +                    
 +                    try {
-                         eventItr = currentSearcher.select(query);
++                        eventItr = currentSearcher.select(query, 
referencedFields);
 +                    } catch (final IOException ioe) {
 +                        throw new EventNotFoundException("Could not find next 
event", ioe);
 +                    }
 +                }
 +                
 +                // the event iterator has no events, and the search iterator 
has no more
 +                // searchers. There are no more events.
 +                return eventItr != null && eventItr.hasNext();
 +            }
 +
 +            @Override
 +            public StoredProvenanceEvent next() {
 +                lastTimeProgressMade.set(System.nanoTime());
 +                return eventItr.next();
 +            }
 +
 +            @Override
 +            public void remove() {
 +                throw new UnsupportedOperationException();
 +            }
 +        };
 +    }
 +    
 +    public ProvenanceResultSet query(final String query) throws IOException {
 +        final ProvenanceQuerySubmission submission = submitQuery(query);
 +        return submission.getResult().getResultSet();
 +    }
 +    
 +    
 +    public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final 
String queryIdentifier) {
 +        return 
queryManager.retrieveProvenanceQuerySubmission(queryIdentifier);
 +    }
 +    
 +    @Override
 +    public ProvenanceQuerySubmission submitQuery(final String query) {
 +        ProvenanceQuerySubmission submission;
 +        final AtomicLong lastTimeProgressMade = new 
AtomicLong(System.nanoTime());
 +        final long tenMinsInNanos = TimeUnit.MINUTES.toNanos(10);
 +        
 +        try {
-             final ProgressAwareIterator<? extends StoredProvenanceEvent> 
eventItr = selectMatchingEvents(query, lastTimeProgressMade);
-             final ProvenanceResultSet rs = ProvenanceQuery.compile(query, 
getSearchableFields(), getSearchableAttributes()).evaluate(eventItr);
++            final ProvenanceQuery provenanceQuery = 
ProvenanceQuery.compile(query, getSearchableFields(), 
getSearchableAttributes());
++            
++            final Set<String> referencedFields = 
provenanceQuery.getReferencedFields();
++//            final Set<String> referencedFields = null;
++            final ProgressAwareIterator<? extends StoredProvenanceEvent> 
eventItr = selectMatchingEvents(query, referencedFields, lastTimeProgressMade);
++            final ProvenanceResultSet rs = provenanceQuery.evaluate(eventItr);
 +            
 +            submission = new JournalingRepoQuerySubmission(query, new 
ProvenanceQueryResult() {
 +                @Override
 +                public ProvenanceResultSet getResultSet() {
 +                    return rs;
 +                }
 +
 +                @Override
 +                public Date getExpiration() {
 +                    return new Date(tenMinsInNanos + 
lastTimeProgressMade.get());
 +                }
 +
 +                @Override
 +                public String getError() {
 +                    return null;
 +                }
 +
 +                @Override
 +                public int getPercentComplete() {
 +                    return eventItr.getPercentComplete();
 +                }
 +
 +                @Override
 +                public boolean isFinished() {
 +                    return eventItr.getPercentComplete() >= 100;
 +                }
 +            });
 +        } catch (final IOException ioe) {
 +            logger.error("Failed to perform query {} due to {}", query, 
ioe.toString());
 +            if ( logger.isDebugEnabled() ) {
 +                logger.error("", ioe);
 +            }
 +            
 +            submission = new JournalingRepoQuerySubmission(query, new 
ProvenanceQueryResult() {
 +                @Override
 +                public ProvenanceResultSet getResultSet() {
 +                    return null;
 +                }
 +
 +                @Override
 +                public Date getExpiration() {
 +                    return new Date(tenMinsInNanos + 
lastTimeProgressMade.get());
 +                }
 +
 +                @Override
 +                public String getError() {
 +                    return "Failed to perform query due to " + ioe;
 +                }
 +
 +                @Override
 +                public int getPercentComplete() {
 +                    return 0;
 +                }
 +
 +                @Override
 +                public boolean isFinished() {
 +                    return true;
 +                }
 +            });
 +        }
 +        
 +        queryManager.registerSubmission(submission);
 +        return submission;
 +    }
 +    
 +    
 +    @Override
 +    public QuerySubmission submitQuery(final Query query) {
 +        return queryManager.submitQuery(query);
 +    }
 +
 +    @Override
 +    public QuerySubmission retrieveQuerySubmission(final String 
queryIdentifier) {
 +        return queryManager.retrieveQuerySubmission(queryIdentifier);
 +    }
 +
 +    @Override
 +    public ComputeLineageSubmission submitLineageComputation(final String 
flowFileUuid) {
 +        return queryManager.submitLineageComputation(flowFileUuid);
 +    }
 +
 +    @Override
 +    public ComputeLineageSubmission retrieveLineageSubmission(final String 
lineageIdentifier) {
 +        return queryManager.retrieveLineageSubmission(lineageIdentifier);
 +    }
 +
 +    @Override
 +    public ComputeLineageSubmission submitExpandParents(final long eventId) {
 +        return queryManager.submitExpandParents(this, eventId);
 +    }
 +
 +    @Override
 +    public ComputeLineageSubmission submitExpandChildren(final long eventId) {
 +        return queryManager.submitExpandChildren(this, eventId);
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +        if ( partitionManager != null ) {
 +            partitionManager.shutdown();
 +        }
 +        
 +        if ( indexManager != null ) {
 +            try {
 +                indexManager.close();
 +            } catch (final IOException ioe) {
 +                logger.warn("Failed to shutdown Index Manager due to {}", 
ioe.toString());
 +                if ( logger.isDebugEnabled() ) {
 +                    logger.warn("", ioe);
 +                }
 +            }
 +        }
 +        
 +        if ( queryManager != null ) {
 +            try {
 +                queryManager.close();
 +            } catch (final IOException ioe) {
 +                logger.warn("Failed to shutdown Query Manager due to {}", 
ioe.toString());
 +                if ( logger.isDebugEnabled() ) {
 +                    logger.warn("", ioe);
 +                }
 +            }
 +        }
 +        
 +        compressionExecutor.shutdown();
 +        workerExecutor.shutdown();
 +        queryExecutor.shutdown();
 +    }
 +
 +    @Override
 +    public List<SearchableField> getSearchableFields() {
 +        final List<SearchableField> searchableFields = new 
ArrayList<>(config.getSearchableFields());
 +        // we exclude the Event Time because it is always searchable and is a 
bit special in its handling
 +        // because it dictates in some cases which index files we look at
 +        searchableFields.remove(SearchableFields.EventTime);
 +        return searchableFields;
 +    }
 +
 +    @Override
 +    public List<SearchableField> getSearchableAttributes() {
 +        return config.getSearchableAttributes();
 +    }
 +
 +    @Override
 +    public Long getEarliestEventTime() throws IOException {
 +        // Get the earliest event timestamp for each partition
 +        final Set<Long> earliestTimes = 
partitionManager.withEachPartitionSerially(new PartitionAction<Long>() {
 +            @Override
 +            public Long perform(final Partition partition) throws IOException 
{
 +                return partition.getEarliestEventTime();
 +            }
 +        }, false);
 +        
 +        // Find the latest timestamp for each of the "earliest" timestamps.
 +        // This is a bit odd, but we're doing it for a good reason:
 +        //      The UI is going to show the earliest time available. Because 
we have a partitioned write-ahead
 +        //      log, if we just return the timestamp of the earliest event 
available, we could end up returning
 +        //      a time for an event that exists but the next event in its 
lineage does not exist because it was
 +        //      already aged off of a different journal. To avoid this, we 
return the "latest of the earliest"
 +        //      timestamps. This way, we know that no event with a larger ID 
has been aged off from any of the
 +        //      partitions.
 +        Long latest = null;
 +        for ( final Long earliestTime : earliestTimes ) {
 +            if ( earliestTime == null ) {
 +                continue;
 +            }
 +            
 +            if ( latest == null || earliestTime > latest ) {
 +                latest = earliestTime;
 +            }
 +        }
 +        
 +        return latest;
 +    }
 +
 +
 +    
 +    private class ExpireOldEvents implements Runnable {
 +        @Override
 +        public void run() {
 +            final long now = System.currentTimeMillis();
 +            final long expirationThreshold = now - 
config.getEventExpiration(TimeUnit.MILLISECONDS);
 +            
 +            try {
 +                indexManager.deleteOldEvents(expirationThreshold);
 +            } catch (final IOException ioe) {
 +                logger.error("Failed to delete expired events from index due 
to {}", ioe.toString());
 +                if ( logger.isDebugEnabled() ) {
 +                    logger.error("", ioe);
 +                }
 +            }
 +            
 +            try {
 +                partitionManager.withEachPartitionSerially(new 
VoidPartitionAction() {
 +                    @Override
 +                    public void perform(final Partition partition) throws 
IOException {
 +                        try {
 +                            partition.deleteOldEvents(expirationThreshold);
 +                        } catch (final IOException ioe) {
 +                            logger.error("Failed to delete expired events 
from Partition {} due to {}", partition, ioe.toString());
 +                            if ( logger.isDebugEnabled() ) {
 +                                logger.error("", ioe);
 +                            }
 +                        }
 +                    }
 +                }, false);
 +            } catch (IOException ioe) {
 +                logger.error("Failed to delete expired events from journals 
due to {}", ioe.toString());
 +                if ( logger.isDebugEnabled() ) {
 +                    logger.error("", ioe);
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java
index d0562e2,0000000..1abd5c9
mode 100644,000000..100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/LazyInitializedProvenanceEvent.java
@@@ -1,367 -1,0 +1,373 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.provenance.journaling;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.lucene.document.Document;
 +import org.apache.lucene.index.IndexableField;
 +import org.apache.nifi.provenance.ProvenanceEventRecord;
 +import org.apache.nifi.provenance.ProvenanceEventRepository;
 +import org.apache.nifi.provenance.ProvenanceEventType;
 +import org.apache.nifi.provenance.SearchableFields;
 +import org.apache.nifi.provenance.StorageLocation;
 +import org.apache.nifi.provenance.StoredProvenanceEvent;
 +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException;
 +import org.apache.nifi.provenance.journaling.index.IndexedFieldNames;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class LazyInitializedProvenanceEvent implements StoredProvenanceEvent {
 +    private static final Logger logger = 
LoggerFactory.getLogger(LazyInitializedProvenanceEvent.class);
 +    
 +    private final ProvenanceEventRepository repo;
 +    private final StorageLocation storageLocation;
 +    private final Document doc;
 +    private ProvenanceEventRecord fullRecord;
 +    
 +    public LazyInitializedProvenanceEvent(final ProvenanceEventRepository 
repo, final StorageLocation storageLocation, final Document document) {
 +        this.repo = repo;
 +        this.storageLocation = storageLocation;
 +        this.doc = document;
 +    }
 +
 +    @Override
 +    public long getEventId() {
 +        return 
doc.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue();
 +    }
 +    
 +    @Override
 +    public StorageLocation getStorageLocation() {
 +        return storageLocation;
 +    }
 +
 +    @Override
 +    public long getEventTime() {
 +        return 
doc.getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue();
 +    }
 +
 +    private void ensureFullyLoaded() {
 +        if ( fullRecord != null ) {
 +            return;
 +        }
 +        
 +        final long id = getEventId();
 +        try {
 +            fullRecord = repo.getEvent(id);
 +        } catch (final IOException ioe) {
 +            final String containerName = 
doc.get(IndexedFieldNames.CONTAINER_NAME);
 +            final String sectionName = 
doc.get(IndexedFieldNames.SECTION_NAME);
 +            final String journalId = doc.get(IndexedFieldNames.JOURNAL_ID);
 +            
 +            final String error = "Failed to load event with ID " + id + " 
from container '" + containerName + "', section '" + sectionName + "', journal 
'" + journalId + "' due to " + ioe;
 +            logger.error(error);
 +            if ( logger.isDebugEnabled() ) {
 +                logger.error("", ioe);
 +            }
 +
 +            throw new EventNotFoundException(error);
 +        }
 +    }
 +    
 +    @Override
 +    public long getFlowFileEntryDate() {
 +        ensureFullyLoaded();
 +        return fullRecord.getFlowFileEntryDate();
 +    }
 +
 +    @Override
 +    public long getLineageStartDate() {
 +        final IndexableField field = 
doc.getField(SearchableFields.LineageStartDate.getSearchableFieldName());
 +        if ( field != null ) {
 +            return field.numericValue().longValue();
 +        }
 +
 +        ensureFullyLoaded();
 +        return fullRecord.getLineageStartDate();
 +    }
 +
 +    @Override
 +    public Set<String> getLineageIdentifiers() {
 +        ensureFullyLoaded();
 +        return fullRecord.getLineageIdentifiers();
 +    }
 +
 +    @Override
 +    public long getFileSize() {
 +        final IndexableField field = 
doc.getField(SearchableFields.FileSize.getSearchableFieldName());
 +        if ( field != null ) {
 +            return field.numericValue().longValue();
 +        }
 +
 +        ensureFullyLoaded();
 +        return fullRecord.getFileSize();
 +    }
 +
 +    @Override
 +    public Long getPreviousFileSize() {
 +        ensureFullyLoaded();
 +        return fullRecord.getPreviousFileSize();
 +    }
 +
 +    @Override
 +    public long getEventDuration() {
 +        // TODO: Allow Event Duration to be indexed; it could be interesting 
for reporting.
 +        ensureFullyLoaded();
 +        return fullRecord.getEventDuration();
 +    }
 +
 +    @Override
 +    public ProvenanceEventType getEventType() {
 +        final String name = 
doc.get(SearchableFields.EventType.getSearchableFieldName());
 +        return ProvenanceEventType.valueOf(name.toUpperCase());
 +    }
 +
 +    @Override
 +    public Map<String, String> getAttributes() {
 +        ensureFullyLoaded();
 +        return fullRecord.getAttributes();
 +    }
 +
 +    @Override
 +    public String getAttribute(final String attributeName) {
 +        final String attr = doc.get(attributeName);
 +        if ( attr == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getAttribute(attributeName);
 +        } else {
 +            return attr;
 +        }
 +    }
 +
 +    @Override
 +    public Map<String, String> getPreviousAttributes() {
 +        ensureFullyLoaded();
 +        return fullRecord.getPreviousAttributes();
 +    }
 +
 +    @Override
 +    public Map<String, String> getUpdatedAttributes() {
 +        ensureFullyLoaded();
 +        return fullRecord.getUpdatedAttributes();
 +    }
 +
 +    @Override
 +    public String getComponentId() {
 +        final String componentId = 
doc.get(SearchableFields.ComponentID.getSearchableFieldName());
 +        if ( componentId == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getComponentId();
 +        } else {
 +            return componentId;
 +        }
 +    }
 +
 +    @Override
 +    public String getComponentType() {
 +        // TODO: Make indexable.
 +        ensureFullyLoaded();
 +        return fullRecord.getComponentType();
 +    }
 +
 +    @Override
 +    public String getTransitUri() {
 +        final String transitUri = 
doc.get(SearchableFields.TransitURI.getSearchableFieldName());
 +        if ( transitUri == null ) {
 +            final ProvenanceEventType eventType = getEventType();
 +            switch (eventType) {
 +                case RECEIVE:
 +                case SEND:
 +                    ensureFullyLoaded();
 +                    return fullRecord.getTransitUri();
 +                default:
 +                    return null;
 +            }
 +        } else {
 +            return transitUri;
 +        }
 +    }
 +
 +    @Override
 +    public String getSourceSystemFlowFileIdentifier() {
 +        ensureFullyLoaded();
 +        return fullRecord.getSourceSystemFlowFileIdentifier();
 +    }
 +
 +    @Override
 +    public String getFlowFileUuid() {
-         return 
doc.get(SearchableFields.FlowFileUUID.getSearchableFieldName());
++        String uuid = 
doc.get(SearchableFields.FlowFileUUID.getSearchableFieldName());
++        if ( uuid != null ) {
++            return uuid;
++        }
++
++        ensureFullyLoaded();
++        return fullRecord.getFlowFileUuid();
 +    }
 +
 +    @Override
 +    public List<String> getParentUuids() {
 +        final IndexableField[] uuids = 
doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName());
-         if ( uuids.length < 2 ) {
++        if ( uuids == null || uuids.length < 2 ) {
 +            return Collections.emptyList();
 +        }
 +        
 +        switch (getEventType()) {
 +            case JOIN: {
 +                final List<String> parentUuids = new ArrayList<>(uuids.length 
- 1);
 +                for (int i=1; i < uuids.length; i++) {
 +                    parentUuids.add(uuids[i].stringValue());
 +                }
 +                return parentUuids;
 +            }
 +            default:
 +                return Collections.emptyList();
 +        }
 +    }
 +
 +    @Override
 +    public List<String> getChildUuids() {
 +        final IndexableField[] uuids = 
doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName());
-         if ( uuids.length < 2 ) {
++        if ( uuids == null || uuids.length < 2 ) {
 +            return Collections.emptyList();
 +        }
 +        
 +        switch (getEventType()) {
 +            case REPLAY:
 +            case CLONE:
 +            case FORK: {
 +                final List<String> childUuids = new ArrayList<>(uuids.length 
- 1);
 +                for (int i=1; i < uuids.length; i++) {
 +                    childUuids.add(uuids[i].stringValue());
 +                }
 +                return childUuids;
 +            }
 +            default:
 +                return Collections.emptyList();
 +        }
 +    }
 +
 +    @Override
 +    public String getAlternateIdentifierUri() {
 +        final String altId = 
doc.get(SearchableFields.AlternateIdentifierURI.getSearchableFieldName());
 +        if ( altId == null && getEventType() == ProvenanceEventType.ADDINFO ) 
{
 +            ensureFullyLoaded();
 +            return fullRecord.getAlternateIdentifierUri();
 +        } else { 
 +            return null;
 +        }
 +    }
 +
 +    @Override
 +    public String getDetails() {
 +        final String details = 
doc.get(SearchableFields.Details.getSearchableFieldName());
 +        if ( details == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getDetails();
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public String getRelationship() {
 +        final String relationship = 
doc.get(SearchableFields.Relationship.getSearchableFieldName());
 +        if ( relationship == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getRelationship();
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public String getSourceQueueIdentifier() {
 +        final String queueId = 
doc.get(SearchableFields.SourceQueueIdentifier.getSearchableFieldName());
 +        if ( queueId == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getSourceQueueIdentifier();
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public String getContentClaimSection() {
 +        final String claimSection = 
doc.get(SearchableFields.ContentClaimSection.getSearchableFieldName());
 +        if ( claimSection == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getContentClaimSection();
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public String getPreviousContentClaimSection() {
 +        ensureFullyLoaded();
 +        return fullRecord.getPreviousContentClaimSection();
 +    }
 +
 +    @Override
 +    public String getContentClaimContainer() {
 +        final String claimContainer = 
doc.get(SearchableFields.ContentClaimContainer.getSearchableFieldName());
 +        if ( claimContainer == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getContentClaimContainer();
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public String getPreviousContentClaimContainer() {
 +        ensureFullyLoaded();
 +        return fullRecord.getPreviousContentClaimContainer();
 +    }
 +
 +    @Override
 +    public String getContentClaimIdentifier() {
 +        final String claimIdentifier = 
doc.get(SearchableFields.ContentClaimIdentifier.getSearchableFieldName());
 +        if ( claimIdentifier == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getContentClaimIdentifier();
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public String getPreviousContentClaimIdentifier() {
 +        ensureFullyLoaded();
 +        return fullRecord.getPreviousContentClaimIdentifier();
 +    }
 +
 +    @Override
 +    public Long getContentClaimOffset() {
 +        final String claimOffset = 
doc.get(SearchableFields.ContentClaimOffset.getSearchableFieldName());
 +        if ( claimOffset == null ) {
 +            ensureFullyLoaded();
 +            return fullRecord.getContentClaimOffset();
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public Long getPreviousContentClaimOffset() {
 +        ensureFullyLoaded();
 +        return fullRecord.getPreviousContentClaimOffset();
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
index 1761057,0000000..fbc5746
mode 100644,000000..100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
@@@ -1,105 -1,0 +1,107 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.provenance.journaling.index;
 +
 +import java.io.Closeable;
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.List;
++import java.util.Set;
 +
 +import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
 +import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent;
 +import org.apache.nifi.provenance.search.Query;
 +
 +public interface EventIndexSearcher extends Closeable {
 +    /**
 +     * Searches the repository for any events that match the provided query 
and returns the locations
 +     * where those events are stored
 +     * @param query
 +     * @return
 +     */
 +    SearchResult search(Query query) throws IOException;
 +    
 +    /**
 +     * Returns the locations of all events for a FlowFile that has a FlowFile 
UUID in the collection of
 +     * UUIDs provided, if the event time occurs between earliestTime and 
latestTime. The return value is 
 +     * ordered in the order in which the records should be read from the 
journals in order to obtain 
 +     * maximum efficiency
 +     * 
 +     * @param flowFileUuids
 +     * @param earliestTime
 +     * @param latestTime
 +     * 
 +     * @return
 +     * @throws IOException
 +     */
 +    List<JournaledStorageLocation> getEventsForFlowFiles(Collection<String> 
flowFileUuids, long earliestTime, long latestTime) throws IOException;
 +    
 +    /**
 +     * Returns the locations of events that have Event ID's at least equal to 
minEventId, and returns
 +     * up to the given number of results
 +     * 
 +     * @param minEventId
 +     * @param maxResults
 +     * @return
 +     * @throws IOException
 +     */
 +    List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) 
throws IOException;
 +    
 +    /**
 +     * Returns the largest event id that is known by the index being searched
 +     * @param container
 +     * @param section
 +     * @return
 +     * @throws IOException
 +     */
 +    Long getMaxEventId(String container, String section) throws IOException;
 +    
 +    /**
 +     * Returns the locations of the latest events for the index being searched
 +     * @param numEvents
 +     * @return
 +     * @throws IOException
 +     */
 +    List<JournaledStorageLocation> getLatestEvents(int numEvents) throws 
IOException;
 +    
 +    /**
 +     * Returns the total number of events that exist for the index being 
searched
 +     * @return
 +     * @throws IOException
 +     */
 +    long getNumberOfEvents() throws IOException;
 +    
 +    /**
 +     * Evaluates the given query against the index, returning an iterator of 
lazily initialized provenance events
 +     * 
 +     * @param query
++     * @param referencedFields the set of fields that are referenced in the 
query
 +     * @throws IOException
 +     */
-     Iterator<LazyInitializedProvenanceEvent> select(String query) throws 
IOException;
++    Iterator<LazyInitializedProvenanceEvent> select(String query, Set<String> 
referencedFields) throws IOException;
 +    
 +    /**
 +     * Evaluates the given query against the index, returning an iterator of 
locations from which the matching
 +     * records can be retrieved
 +     * 
 +     * @param query
 +     * @return
 +     * @throws IOException
 +     */
 +    Iterator<JournaledStorageLocation> selectLocations(String query) throws 
IOException;
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/79c60016/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
index 3fb641e,0000000..9bc123f
mode 100644,000000..100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
@@@ -1,293 -1,0 +1,333 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.provenance.journaling.index;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
++import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.NoSuchElementException;
++import java.util.Set;
 +
 +import org.apache.lucene.document.Document;
 +import org.apache.lucene.index.DirectoryReader;
 +import org.apache.lucene.index.Term;
 +import org.apache.lucene.search.BooleanClause.Occur;
 +import org.apache.lucene.search.BooleanQuery;
 +import org.apache.lucene.search.IndexSearcher;
 +import org.apache.lucene.search.MatchAllDocsQuery;
 +import org.apache.lucene.search.NumericRangeQuery;
 +import org.apache.lucene.search.ScoreDoc;
 +import org.apache.lucene.search.Sort;
 +import org.apache.lucene.search.SortField;
 +import org.apache.lucene.search.SortField.Type;
 +import org.apache.lucene.search.TermQuery;
 +import org.apache.lucene.search.TopDocs;
 +import org.apache.lucene.search.TopFieldDocs;
 +import org.apache.lucene.store.FSDirectory;
 +import org.apache.nifi.pql.LuceneTranslator;
 +import org.apache.nifi.pql.ProvenanceQuery;
 +import org.apache.nifi.provenance.ProvenanceEventRepository;
 +import org.apache.nifi.provenance.SearchableFields;
 +import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
 +import org.apache.nifi.provenance.journaling.LazyInitializedProvenanceEvent;
 +import org.apache.nifi.provenance.journaling.exception.EventNotFoundException;
 +import org.apache.nifi.provenance.search.Query;
 +import org.apache.nifi.util.ObjectHolder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class LuceneIndexSearcher implements EventIndexSearcher {
 +    private static final Logger logger = 
LoggerFactory.getLogger(LuceneIndexSearcher.class);
 +    
 +    private final ProvenanceEventRepository repo;
 +    private final DirectoryReader reader;
 +    private final IndexSearcher searcher;
 +    private final FSDirectory fsDirectory;
 +    
 +    private final String description;
 +    
++    private static final Set<String> REQUIRED_FIELDS;
++    
++    static {
++        final Set<String> fields = new HashSet<>();
++        fields.add(IndexedFieldNames.BLOCK_INDEX);
++        fields.add(IndexedFieldNames.CONTAINER_NAME);
++        fields.add(IndexedFieldNames.EVENT_ID);
++        fields.add(IndexedFieldNames.JOURNAL_ID);
++        fields.add(IndexedFieldNames.SECTION_NAME);
++        fields.add(SearchableFields.EventTime.getSearchableFieldName());
++        fields.add(SearchableFields.EventType.getSearchableFieldName());
++        REQUIRED_FIELDS = fields;
++    }
++    
 +    public LuceneIndexSearcher(final ProvenanceEventRepository repo, final 
File indexDirectory) throws IOException {
 +        this.repo = repo;
 +        this.fsDirectory = FSDirectory.open(indexDirectory);
 +        this.reader = DirectoryReader.open(fsDirectory);
 +        this.searcher = new IndexSearcher(reader);
 +        this.description = "LuceneIndexSearcher[indexDirectory=" + 
indexDirectory + "]";
 +    }
 +    
 +    public LuceneIndexSearcher(final ProvenanceEventRepository repo, final 
DirectoryReader reader, final File indexDirectory) {
 +        this.repo = repo;
 +        this.reader = reader;
 +        this.searcher = new IndexSearcher(reader);
 +        this.fsDirectory = null;
 +        this.description = "LuceneIndexSearcher[indexDirectory=" + 
indexDirectory + "]";
 +    }
 +    
 +    @Override
 +    public void close() throws IOException {
 +        IOException suppressed = null;
 +        try {
 +            reader.close();
 +        } catch (final IOException ioe) {
 +            suppressed = ioe;
 +        }
 +        
 +        if ( fsDirectory != null ) {
 +            fsDirectory.close();
 +        }
 +        
 +        if ( suppressed != null ) {
 +            throw suppressed;
 +        }
 +    }
 +
 +    
 +    private List<JournaledStorageLocation> getOrderedLocations(final TopDocs 
topDocs) throws IOException {
 +        final ScoreDoc[] scoreDocs = topDocs.scoreDocs;
 +        final List<JournaledStorageLocation> locations = new 
ArrayList<>(scoreDocs.length);
 +        populateLocations(topDocs, locations);
 +        
 +        return locations;
 +    }
 +    
 +    
 +    private void populateLocations(final TopDocs topDocs, final 
Collection<JournaledStorageLocation> locations) throws IOException {
 +        for ( final ScoreDoc scoreDoc : topDocs.scoreDocs ) {
 +            final Document document = reader.document(scoreDoc.doc);
 +            locations.add(QueryUtils.createLocation(document));
 +        }
 +    }
 +    
 +    
 +    @Override
 +    public SearchResult search(final Query provenanceQuery) throws 
IOException {
 +        final org.apache.lucene.search.Query luceneQuery = 
QueryUtils.convertQueryToLucene(provenanceQuery);
 +        final TopDocs topDocs = searcher.search(luceneQuery, 
provenanceQuery.getMaxResults());
 +        final List<JournaledStorageLocation> locations = 
getOrderedLocations(topDocs);
 +        
 +        return new SearchResult(locations, topDocs.totalHits);
 +    }
 +
 +    @Override
 +    public List<JournaledStorageLocation> getEvents(final long minEventId, 
final int maxResults) throws IOException {
 +        final BooleanQuery query = new BooleanQuery();
 +        query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.EVENT_ID, 
minEventId, null, true, true), Occur.MUST);
 +        
 +        final TopDocs topDocs = searcher.search(query, maxResults, new 
Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG)));
 +        return getOrderedLocations(topDocs);
 +    }
 +
 +    @Override
 +    public Long getMaxEventId(final String container, final String section) 
throws IOException {
 +        final BooleanQuery query = new BooleanQuery();
 +        
 +        if ( container != null ) {
 +            query.add(new TermQuery(new 
Term(IndexedFieldNames.CONTAINER_NAME, container)), Occur.MUST);
 +        }
 +        
 +        if ( section != null ) {
 +            query.add(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, 
section)), Occur.MUST);
 +        }
 +        
 +        final TopDocs topDocs = searcher.search(query, 1, new Sort(new 
SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true)));
 +        final List<JournaledStorageLocation> locations = 
getOrderedLocations(topDocs);
 +        if ( locations.isEmpty() ) {
 +            return null;
 +        }
 +        
 +        return locations.get(0).getEventId();
 +    }
 +
 +    @Override
 +    public List<JournaledStorageLocation> getEventsForFlowFiles(final 
Collection<String> flowFileUuids, final long earliestTime, final long 
latestTime) throws IOException {
 +        // Create a query for all Events related to the FlowFiles of 
interest. We do this by adding all ID's as
 +        // "SHOULD" clauses and then setting the minimum required to 1.
 +        final BooleanQuery flowFileIdQuery;
 +        if (flowFileUuids == null || flowFileUuids.isEmpty()) {
 +            flowFileIdQuery = null;
 +        } else {
 +            flowFileIdQuery = new BooleanQuery();
 +            for (final String flowFileUuid : flowFileUuids) {
 +                flowFileIdQuery.add(new TermQuery(new 
Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), 
Occur.SHOULD);
 +            }
 +            flowFileIdQuery.setMinimumNumberShouldMatch(1);
 +        }
 +        
 +        
flowFileIdQuery.add(NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(),
 
 +                earliestTime, latestTime, true, true), Occur.MUST);
 +        
 +        final TopDocs topDocs = searcher.search(flowFileIdQuery, 1000);
 +        return getOrderedLocations(topDocs);
 +    }
 +    
 +    
 +    @Override
 +    public List<JournaledStorageLocation> getLatestEvents(final int 
numEvents) throws IOException {
 +        final MatchAllDocsQuery query = new MatchAllDocsQuery();
 +        
 +        final TopFieldDocs topDocs = searcher.search(query, numEvents, new 
Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true)));
 +        final List<JournaledStorageLocation> locations = 
getOrderedLocations(topDocs);
 +        return locations;
 +    }
 +    
 +    @Override
 +    public String toString() {
 +        return description;
 +    }
 +
 +    @Override
 +    public long getNumberOfEvents() {
 +        return reader.numDocs();
 +    }
 +    
 +    
-     private <T> Iterator<T> select(final String query, final 
DocumentTransformer<T> transformer) throws IOException {
++    private <T> Iterator<T> select(final String query, final Set<String> 
referencedFields, final DocumentTransformer<T> transformer) throws IOException {
 +        final org.apache.lucene.search.Query luceneQuery = 
LuceneTranslator.toLuceneQuery(ProvenanceQuery.compile(query, 
repo.getSearchableFields(), repo.getSearchableAttributes()).getWhereClause());
-         final int batchSize = 1000;
++        final int batchSize = 1000000;
++        
++        final Set<String> fieldsToLoad;
++        if ( referencedFields == null ) {
++            fieldsToLoad = null;
++        } else {
++            fieldsToLoad = new HashSet<>(REQUIRED_FIELDS);
++            fieldsToLoad.addAll(referencedFields);
++        }
 +        
 +        final ObjectHolder<TopDocs> topDocsHolder = new ObjectHolder<>(null);
 +        return new Iterator<T>() {
 +            int fetched = 0;
 +            int scoreDocIndex = 0;
 +            
 +            @Override
 +            public boolean hasNext() {
 +                if ( topDocsHolder.get() == null ) {
 +                    try {
 +                        topDocsHolder.set(searcher.search(luceneQuery, 
batchSize));
 +                    } catch (final IOException ioe) {
 +                        throw new EventNotFoundException("Unable to obtain 
next record from " + LuceneIndexSearcher.this, ioe);
 +                    }
 +                }
 +                
 +                final boolean hasNext = fetched < 
topDocsHolder.get().totalHits;
 +                if ( !hasNext ) {
 +                    try {
 +                        LuceneIndexSearcher.this.close();
 +                    } catch (final IOException ioe) {
 +                        logger.warn("Failed to close {} due to {}", this, 
ioe.toString());
 +                        if ( logger.isDebugEnabled() ) {
 +                            logger.warn("", ioe);
 +                        }
 +                    }
 +                }
 +                return hasNext;
 +            }
 +
 +            @Override
 +            public T next() {
 +                if ( !hasNext() ) {
 +                    throw new NoSuchElementException();
 +                }
 +                
 +                TopDocs topDocs = topDocsHolder.get();
 +                ScoreDoc[] scoreDocs = topDocs.scoreDocs;
 +                if ( scoreDocIndex >= scoreDocs.length ) {
 +                    try {
-                         topDocs = 
searcher.searchAfter(scoreDocs[scoreDocs.length - 1], luceneQuery, batchSize);
++                        topDocs = getTopDocs(scoreDocs[scoreDocs.length - 1], 
luceneQuery, batchSize);
 +                        topDocsHolder.set(topDocs);
 +                        scoreDocs = topDocs.scoreDocs;
 +                        scoreDocIndex = 0;
 +                    } catch (final IOException ioe) {
 +                        throw new EventNotFoundException("Unable to obtain 
next record from " + LuceneIndexSearcher.this, ioe);
 +                    }
 +                }
 +                
 +                final ScoreDoc scoreDoc = scoreDocs[scoreDocIndex++];
 +                final Document document;
 +                try {
-                     document = searcher.doc(scoreDoc.doc);
++                    document = getDocument(scoreDoc.doc, fieldsToLoad);
 +                } catch (final IOException ioe) {
 +                    throw new EventNotFoundException("Unable to obtain next 
record from " + LuceneIndexSearcher.this, ioe);
 +                }
 +                fetched++;
 +                
 +                return transformer.transform(document);
 +            }
 +
++            // this method exists solely for the use of a profiler so that I 
can see which methods are taking the longest when
++            // profiling only org.apache.nifi.*
++            private TopDocs getTopDocs(final ScoreDoc start, final 
org.apache.lucene.search.Query luceneQuery, final int batchSize) throws 
IOException {
++                return searcher.searchAfter(start, luceneQuery, batchSize);
++            }
++            
++            // this method exists solely for the use of a profiler so that I 
can see which methods are taking the longest when
++            // profiling only org.apache.nifi.*
++            private Document getDocument(final int docId, final Set<String> 
referencedFields) throws IOException {
++                if ( referencedFields == null || referencedFields.isEmpty() ) 
{
++                    return searcher.doc(docId);
++                } else {
++                    return searcher.doc(docId, referencedFields);
++                }
++            }
++            
 +            @Override
 +            public void remove() {
 +                throw new UnsupportedOperationException();
 +            }
 +        };
 +    }
 +
 +    @Override
-     public Iterator<LazyInitializedProvenanceEvent> select(final String 
query) throws IOException {
-         return select(query, new 
DocumentTransformer<LazyInitializedProvenanceEvent>() {
++    public Iterator<LazyInitializedProvenanceEvent> select(final String 
query, final Set<String> referencedFields) throws IOException {
++        return select(query, referencedFields, new 
DocumentTransformer<LazyInitializedProvenanceEvent>() {
 +            @Override
 +            public LazyInitializedProvenanceEvent transform(final Document 
document) {
 +                return new LazyInitializedProvenanceEvent(repo, 
QueryUtils.createLocation(document), document);
 +            }
 +        });
 +    }
 +
 +    @Override
 +    public Iterator<JournaledStorageLocation> selectLocations(final String 
query) throws IOException {
-         return select(query, new 
DocumentTransformer<JournaledStorageLocation>() {
++        return select(query, REQUIRED_FIELDS, new 
DocumentTransformer<JournaledStorageLocation>() {
 +            @Override
 +            public JournaledStorageLocation transform(final Document 
document) {
 +                return QueryUtils.createLocation(document);
 +            }
 +        });
 +    }
 +    
 +    private static interface DocumentTransformer<T> {
 +        T transform(Document document);
 +    }
 +}

Reply via email to