http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
new file mode 100644
index 0000000..2130e73
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.SearchableFieldParser;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.StorageLocation;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.QueryUtils;
+import org.apache.nifi.provenance.journaling.journals.JournalReader;
+import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
+import org.apache.nifi.provenance.journaling.partition.Partition;
+import org.apache.nifi.provenance.journaling.partition.PartitionAction;
+import org.apache.nifi.provenance.journaling.partition.PartitionManager;
+import org.apache.nifi.provenance.journaling.partition.QueuingPartitionManager;
+import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction;
+import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
+import org.apache.nifi.provenance.journaling.toc.TocReader;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalingProvenanceRepository implements 
ProvenanceEventRepository {
+    private static final Logger logger = 
LoggerFactory.getLogger(JournalingProvenanceRepository.class);
+    
+    private final JournalingRepositoryConfig config;
+    private final PartitionManager partitionManager;
+    private final AtomicLong idGenerator = new AtomicLong(0L);
+    
+    private EventReporter eventReporter;    // effectively final
+    private final ExecutorService executor;
+    
+    
+    public JournalingProvenanceRepository() throws IOException {
+        this(createConfig());
+    }
+    
+    public JournalingProvenanceRepository(final JournalingRepositoryConfig 
config) throws IOException {
+        this.config = config;
+        this.executor = 
Executors.newFixedThreadPool(config.getThreadPoolSize());
+        this.partitionManager = new QueuingPartitionManager(config, executor);
+    }
+    
+    
+    private static JournalingRepositoryConfig createConfig()  {
+        final NiFiProperties properties = NiFiProperties.getInstance();
+        final Map<String, Path> storageDirectories = 
properties.getProvenanceRepositoryPaths();
+        if (storageDirectories.isEmpty()) {
+            storageDirectories.put("provenance_repository", 
Paths.get("provenance_repository"));
+        }
+        final String storageTime = 
properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_TIME, "24 hours");
+        final String storageSize = 
properties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
+        final String rolloverTime = 
properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
+        final String rolloverSize = 
properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
+        final String shardSize = 
properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
+        final int queryThreads = 
properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 
2);
+        final int journalCount = 
properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
+
+        final long storageMillis = FormatUtils.getTimeDuration(storageTime, 
TimeUnit.MILLISECONDS);
+        final long maxStorageBytes = DataUnit.parseDataSize(storageSize, 
DataUnit.B).longValue();
+        final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, 
TimeUnit.MILLISECONDS);
+        final long rolloverBytes = DataUnit.parseDataSize(rolloverSize, 
DataUnit.B).longValue();
+
+        final boolean compressOnRollover = 
Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
+        final String indexedFieldString = 
properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
+        final String indexedAttrString = 
properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
+
+        final Boolean alwaysSync = 
Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync",
 "false"));
+
+        final List<SearchableField> searchableFields = 
SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
+        final List<SearchableField> searchableAttributes = 
SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
+
+        // We always want to index the Event Time.
+        if (!searchableFields.contains(SearchableFields.EventTime)) {
+            searchableFields.add(SearchableFields.EventTime);
+        }
+
+        final JournalingRepositoryConfig config = new 
JournalingRepositoryConfig();
+        
+        final Map<String, File> containers = new 
HashMap<>(storageDirectories.size());
+        for ( final Map.Entry<String, Path> entry : 
storageDirectories.entrySet() ) {
+            containers.put(entry.getKey(), entry.getValue().toFile());
+        }
+        config.setContainers(containers);
+        config.setCompressOnRollover(compressOnRollover);
+        config.setSearchableFields(searchableFields);
+        config.setSearchableAttributes(searchableAttributes);
+        config.setJournalCapacity(rolloverBytes);
+        config.setJournalRolloverPeriod(rolloverMillis, TimeUnit.MILLISECONDS);
+        config.setEventExpiration(storageMillis, TimeUnit.MILLISECONDS);
+        config.setMaxStorageCapacity(maxStorageBytes);
+        config.setThreadPoolSize(queryThreads);
+        config.setPartitionCount(journalCount);
+
+        if (shardSize != null) {
+            config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, 
DataUnit.B).longValue());
+        }
+
+        config.setAlwaysSync(alwaysSync);
+
+        return config;
+    }
+    
+    @Override
+    public synchronized void initialize(final EventReporter eventReporter) 
throws IOException {
+        this.eventReporter = eventReporter;
+    }
+
+    @Override
+    public ProvenanceEventBuilder eventBuilder() {
+        return new StandardProvenanceEventRecord.Builder();
+    }
+
+    @Override
+    public void registerEvent(final ProvenanceEventRecord event) throws 
IOException {
+        registerEvents(Collections.singleton(event));
+    }
+
+    @Override
+    public void registerEvents(final Collection<ProvenanceEventRecord> events) 
throws IOException {
+        partitionManager.withPartition(new VoidPartitionAction() {
+            @Override
+            public void perform(final Partition partition) throws IOException {
+                partition.registerEvents(events, 
idGenerator.getAndAdd(events.size()));
+            }
+        }, true);
+    }
+
+    @Override
+    public StoredProvenanceEvent getEvent(final long id) throws IOException {
+        final List<StoredProvenanceEvent> events = getEvents(id, 1);
+        if ( events.isEmpty() ) {
+            return null;
+        }
+
+        // We have to check the id of the event returned, because we are 
requesting up to 1 record
+        // starting with the given id. However, if that ID doesn't exist, we 
could get a record
+        // with a larger id.
+        final StoredProvenanceEvent event = events.get(0);
+        if ( event.getEventId() == id ) {
+            return event;
+        }
+        
+        return null;
+    }
+    
+    @Override
+    public List<StoredProvenanceEvent> getEvents(final long firstRecordId, 
final int maxRecords) throws IOException {
+        // Must generate query to determine the appropriate StorageLocation 
objects and then call
+        // getEvent(List<StorageLocation>)
+        final Set<List<JournaledStorageLocation>> resultSet = 
partitionManager.withEachPartition(
+            new PartitionAction<List<JournaledStorageLocation>>() {
+                @Override
+                public List<JournaledStorageLocation> perform(final Partition 
partition) throws IOException {
+                    return partition.getEvents(firstRecordId, maxRecords);
+                }
+        });
+        
+        final ArrayList<JournaledStorageLocation> locations = new 
ArrayList<>(maxRecords);
+        for ( final List<JournaledStorageLocation> list : resultSet ) {
+            for ( final JournaledStorageLocation location : list ) {
+                locations.add(location);
+            }
+        }
+        
+        Collections.sort(locations, new Comparator<JournaledStorageLocation>() 
{
+            @Override
+            public int compare(final JournaledStorageLocation o1, final 
JournaledStorageLocation o2) {
+                return Long.compare(o1.getEventId(), o2.getEventId());
+            }
+        });
+        
+        locations.trimToSize();
+        
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        final List<StorageLocation> storageLocations = (List<StorageLocation>) 
((List) locations);
+        return getEvents(storageLocations);
+    }
+    
+    @Override
+    public StoredProvenanceEvent getEvent(final StorageLocation location) 
throws IOException {
+        final List<StoredProvenanceEvent> storedEvents = 
getEvents(Collections.singletonList(location));
+        return (storedEvents == null || storedEvents.isEmpty()) ? null : 
storedEvents.get(0);
+    }
+    
+    
+    
+    @Override
+    public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> 
locations) throws IOException {
+        // Group the locations by journal files because we want a single 
thread, at most, per journal file.
+        final Map<File, List<JournaledStorageLocation>> orderedLocations = 
QueryUtils.orderLocations(locations, config);
+        
+        // Go through each journal file and create a callable that can lookup 
the records for that journal file.
+        final List<Future<List<StoredProvenanceEvent>>> futures = new 
ArrayList<>();
+        for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : 
orderedLocations.entrySet() ) {
+            final File journalFile = entry.getKey();
+            final List<JournaledStorageLocation> locationsForFile = 
entry.getValue();
+            
+            final Callable<List<StoredProvenanceEvent>> callable = new 
Callable<List<StoredProvenanceEvent>>() {
+                @Override
+                public List<StoredProvenanceEvent> call() throws Exception {
+                    try(final TocReader tocReader = new StandardTocReader(new 
File(journalFile.getParentFile(), journalFile.getName() + ".toc"));
+                        final JournalReader reader = new 
StandardJournalReader(journalFile)) 
+                    {
+                        final List<StoredProvenanceEvent> storedEvents = new 
ArrayList<>(locationsForFile.size());
+                        
+                        for ( final JournaledStorageLocation location : 
locationsForFile ) {
+                            final long blockOffset = 
tocReader.getBlockOffset(location.getBlockIndex());
+                            final ProvenanceEventRecord event = 
reader.getEvent(blockOffset, location.getEventId());
+                            
+                            storedEvents.add(new 
JournaledProvenanceEvent(event, location));
+                        }
+                        
+                        return storedEvents;
+                    }
+                }
+            };
+            
+            final Future<List<StoredProvenanceEvent>> future = 
executor.submit(callable);
+            futures.add(future);
+        }
+        
+        // Get all of the events from the futures, waiting for them to finish.
+        final Map<StorageLocation, StoredProvenanceEvent> locationToEventMap = 
new HashMap<>(locations.size());
+        for ( final Future<List<StoredProvenanceEvent>> future : futures ) {
+            try {
+                final List<StoredProvenanceEvent> events = future.get();
+                
+                // Map the location to the event, so that we can then re-order 
the events in the same order
+                // that the locations were passed to us.
+                for ( final StoredProvenanceEvent event : events ) {
+                    locationToEventMap.put(event.getStorageLocation(), event);
+                }
+            } catch (final ExecutionException ee) {
+                final Throwable cause = ee.getCause();
+                if ( cause instanceof IOException ) {
+                    throw (IOException) cause;
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            } catch (final InterruptedException ie) {
+                throw new RuntimeException(ie);
+            }
+        }
+        
+        // Sort Events by the order of the provided locations.
+        final List<StoredProvenanceEvent> sortedEvents = new 
ArrayList<>(locations.size());
+        for ( final StorageLocation location : locations ) {
+            final StoredProvenanceEvent event = 
locationToEventMap.get(location);
+            if ( event != null ) {
+                sortedEvents.add(event);
+            }
+        }
+        
+        return sortedEvents;
+    }
+    
+
+    @Override
+    public Long getMaxEventId() throws IOException {
+        final Set<Long> maxIds = partitionManager.withEachPartition(new 
PartitionAction<Long>() {
+            @Override
+            public Long perform(final Partition partition) throws IOException {
+                return partition.getMaxEventId();
+            }
+        });
+        
+        Long maxId = null;
+        for ( final Long id : maxIds ) {
+            if ( id == null ) {
+                continue;
+            }
+            
+            if ( maxId == null || id > maxId ) {
+                maxId = id;
+            }
+        }
+        
+        return maxId;
+    }
+
+    
+    @Override
+    public QuerySubmission submitQuery(final Query query) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public QuerySubmission retrieveQuerySubmission(final String 
queryIdentifier) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ComputeLineageSubmission submitLineageComputation(final String 
flowFileUuid) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ComputeLineageSubmission retrieveLineageSubmission(final String 
lineageIdentifier) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ComputeLineageSubmission submitExpandParents(final long eventId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ComputeLineageSubmission submitExpandChildren(final long eventId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        partitionManager.shutdown();
+        executor.shutdown();
+    }
+
+    @Override
+    public List<SearchableField> getSearchableFields() {
+        return config.getSearchableFields();
+    }
+
+    @Override
+    public List<SearchableField> getSearchableAttributes() {
+        return config.getSearchableAttributes();
+    }
+
+    @Override
+    public Long getEarliestEventTime() throws IOException {
+        // Get the earliest event timestamp for each partition
+        final Set<Long> earliestTimes = partitionManager.withEachPartition(new 
PartitionAction<Long>() {
+            @Override
+            public Long perform(final Partition partition) throws IOException {
+                return partition.getEarliestEventTime();
+            }
+        });
+        
+        // Find the latest timestamp for each of the "earliest" timestamps.
+        // This is a bit odd, but we're doing it for a good reason:
+        //      The UI is going to show the earliest time available. Because 
we have a partitioned write-ahead
+        //      log, if we just return the timestamp of the earliest event 
available, we could end up returning
+        //      a time for an event that exists but the next event in its 
lineage does not exist because it was
+        //      already aged off of a different journal. To avoid this, we 
return the "latest of the earliest"
+        //      timestamps. This way, we know that no event with a larger ID 
has been aged off from any of the
+        //      partitions.
+        Long latest = null;
+        for ( final Long earliestTime : earliestTimes ) {
+            if ( earliestTime == null ) {
+                continue;
+            }
+            
+            if ( latest == null || earliestTime > latest ) {
+                latest = earliestTime;
+            }
+        }
+        
+        return latest;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
new file mode 100644
index 0000000..6dd7be9
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.config;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.search.SearchableField;
+
+public class JournalingRepositoryConfig {
+    private Map<String, File> containers = new HashMap<>();
+    private long expirationMillis = TimeUnit.MILLISECONDS.convert(24, 
TimeUnit.HOURS);
+    private long storageCapacity = 1024L * 1024L * 1024L;   // 1 GB
+    private long rolloverMillis = TimeUnit.MILLISECONDS.convert(5, 
TimeUnit.MINUTES);
+    private long journalCapacity = 1024L * 1024L * 5L;   // 5 MB
+    private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
+    private int partitionCount = 16;
+    private int blockSize = 100;
+
+    private List<SearchableField> searchableFields = new ArrayList<>();
+    private List<SearchableField> searchableAttributes = new ArrayList<>();
+    private boolean compress = true;
+    private boolean alwaysSync = false;
+    private int threadPoolSize = 4;
+    private boolean readOnly = false;
+
+    public void setReadOnly(final boolean readOnly) {
+        this.readOnly = readOnly;
+    }
+
+    public boolean isReadOnly() {
+        return readOnly;
+    }
+
+    /**
+     * Specifies where the repository will store data
+     *
+     * @return
+     */
+    public Map<String, File> getContainers() {
+        return Collections.unmodifiableMap(containers);
+    }
+
+    /**
+     * Specifies where the repository should store data
+     *
+     * @param storageDirectory
+     */
+    public void setContainers(final Map<String, File> containers) {
+        this.containers = new HashMap<>(containers);
+    }
+
+    /**
+     * Returns the maximum amount of time that a given record will stay in the
+     * repository
+     *
+     * @param timeUnit
+     * @return
+     */
+    public long getEventExpiration(final TimeUnit timeUnit) {
+        return timeUnit.convert(expirationMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Specifies how long a record should stay in the repository
+     *
+     * @param expiration
+     * @param timeUnit
+     */
+    public void setEventExpiration(final long expiration, final TimeUnit 
timeUnit) {
+        this.expirationMillis = TimeUnit.MILLISECONDS.convert(expiration, 
timeUnit);
+    }
+
+    /**
+     * Returns the maximum amount of data to store in the repository (in bytes)
+     *
+     * @return
+     */
+    public long getMaxStorageCapacity() {
+        return storageCapacity;
+    }
+
+    /**
+     * Sets the maximum amount of data to store in the repository (in bytes)
+     * @param maxStorageCapacity
+     */
+    public void setMaxStorageCapacity(final long maxStorageCapacity) {
+        this.storageCapacity = maxStorageCapacity;
+    }
+
+    /**
+     * Returns the maximum amount of time to write to a single event file
+     *
+     * @param timeUnit
+     * @return
+     */
+    public long getJournalRolloverPeriod(final TimeUnit timeUnit) {
+        return timeUnit.convert(rolloverMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Sets the maximum amount of time to write to a single event file
+     *
+     * @param rolloverPeriod
+     * @param timeUnit
+     */
+    public void setJournalRolloverPeriod(final long rolloverPeriod, final 
TimeUnit timeUnit) {
+        this.rolloverMillis = TimeUnit.MILLISECONDS.convert(rolloverPeriod, 
timeUnit);
+    }
+
+    /**
+     * Returns the number of bytes (pre-compression) that will be
+     * written to a single journal file before the file is rolled over
+     *
+     * @return
+     */
+    public long getJournalCapacity() {
+        return journalCapacity;
+    }
+
+    /**
+     * Sets the number of bytes (pre-compression) that will be written
+     * to a single journal file before the file is rolled over
+     *
+     * @param journalCapacity
+     */
+    public void setJournalCapacity(final long journalCapacity) {
+        this.journalCapacity = journalCapacity;
+    }
+
+    /**
+     * Returns the fields that can be indexed
+     *
+     * @return
+     */
+    public List<SearchableField> getSearchableFields() {
+        return Collections.unmodifiableList(searchableFields);
+    }
+
+    /**
+     * Sets the fields to index
+     *
+     * @param searchableFields
+     */
+    public void setSearchableFields(final List<SearchableField> 
searchableFields) {
+        this.searchableFields = new ArrayList<>(searchableFields);
+    }
+
+    /**
+     * Returns the FlowFile attributes that can be indexed
+     *
+     * @return
+     */
+    public List<SearchableField> getSearchableAttributes() {
+        return Collections.unmodifiableList(searchableAttributes);
+    }
+
+    /**
+     * Sets the FlowFile attributes to index
+     *
+     * @param searchableAttributes
+     */
+    public void setSearchableAttributes(final List<SearchableField> 
searchableAttributes) {
+        this.searchableAttributes = new ArrayList<>(searchableAttributes);
+    }
+
+    /**
+     * Indicates whether or not event files will be compressed when they are
+     * rolled over
+     *
+     * @return
+     */
+    public boolean isCompressOnRollover() {
+        return compress;
+    }
+
+    /**
+     * Specifies whether or not to compress event files on rollover
+     *
+     * @param compress
+     */
+    public void setCompressOnRollover(final boolean compress) {
+        this.compress = compress;
+    }
+
+    public int getThreadPoolSize() {
+        return threadPoolSize;
+    }
+
+    public void setThreadPoolSize(final int queryThreadPoolSize) {
+        if (queryThreadPoolSize < 1) {
+            throw new IllegalArgumentException();
+        }
+        this.threadPoolSize = queryThreadPoolSize;
+    }
+
+    /**
+     * <p>
+     * Specifies the desired size of each Provenance Event index shard, in
+     * bytes. We shard the index for a few reasons:
+     * </p>
+     *
+     * <ol>
+     * <li>
+     * A very large index requires a significant amount of Java heap space to
+     * search. As the size of the shard increases, the required Java heap space
+     * also increases.
+     * </li>
+     * <li>
+     * By having multiple shards, we have the ability to use multiple 
concurrent
+     * threads to search the individual shards, resulting in far less latency
+     * when performing a search across millions or billions of records.
+     * </li>
+     * <li>
+     * We keep track of which time ranges each index shard spans. As a result,
+     * we are able to determine which shards need to be searched if a search
+     * provides a date range. This can greatly increase the speed of a search
+     * and reduce resource utilization.
+     * </li>
+     * </ol>
+     *
+     * @param bytes
+     */
+    public void setDesiredIndexSize(final long bytes) {
+        this.desiredIndexBytes = bytes;
+    }
+
+    /**
+     * Returns the desired size of each index shard. See the
+     * {@Link #setDesiredIndexSize} method for an explanation of why we choose
+     * to shard the index.
+     *
+     * @return
+     */
+    public long getDesiredIndexSize() {
+        return desiredIndexBytes;
+    }
+
+    /**
+     * Sets the number of Journal files to use when persisting records.
+     *
+     * @param numJournals
+     */
+    public void setPartitionCount(final int numJournals) {
+        if (numJournals < 1) {
+            throw new IllegalArgumentException();
+        }
+
+        this.partitionCount = numJournals;
+    }
+
+    /**
+     * Returns the number of Journal files that will be used when persisting
+     * records.
+     *
+     * @return
+     */
+    public int getPartitionCount() {
+        return partitionCount;
+    }
+
+    /**
+     * Specifies whether or not the Repository should sync all updates to disk.
+     *
+     * @return
+     */
+    public boolean isAlwaysSync() {
+        return alwaysSync;
+    }
+
+    /**
+     * Configures whether or not the Repository should sync all updates to 
disk.
+     * Setting this value to true means that updates are guaranteed to be
+     * persisted across restarted, even if there is a power failure or a sudden
+     * Operating System crash, but it can be very expensive.
+     *
+     * @param alwaysSync
+     */
+    public void setAlwaysSync(boolean alwaysSync) {
+        this.alwaysSync = alwaysSync;
+    }
+
+    /**
+     * Returns the minimum number of Provenance Events that should be written 
to a single Block.
+     * Events are written out in blocks, which are later optionally 
compressed. A larger block size
+     * will potentially result in better compression. However, a smaller block 
size will result
+     * in better performance when reading the data. The default value is 100 
events per block.
+     * 
+     * @return
+     */
+    public int getBlockSize() {
+        return blockSize;
+    }
+    
+    /**
+     * Sets the minimum number of Provenance Events that should be written to 
a single Block.
+     * Events are written out in blocks, which are later optionally 
compressed. A larger block size
+     * will potentially result in better compression. However, a smaller block 
size will result
+     * in better performance when reading the data. The default value is 100 
events per block.
+     * 
+     * @return
+     */
+    public void setBlockSize(final int blockSize) {
+        if ( blockSize < 1 ) {
+            throw new IllegalArgumentException("Cannot set block size to " + 
blockSize + "; must be a positive integer");
+        }
+        this.blockSize = blockSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
new file mode 100644
index 0000000..b669c53
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.search.Query;
+
+public interface EventIndexSearcher extends Closeable {
+    /**
+     * Searches the repository for any events that match the provided query 
and returns the locations
+     * where those events are stored
+     * @param query
+     * @return
+     */
+    SearchResult search(Query query) throws IOException;
+    
+    List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) 
throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
new file mode 100644
index 0000000..1f231e9
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
+
+public interface EventIndexWriter extends Closeable {
+
+    /**
+     * Adds all of the events to the index.
+     * @param events
+     * @throws IOException
+     */
+    void index(final Collection<JournaledProvenanceEvent> events) throws 
IOException;
+    
+    /**
+     * Forces all updates to the index to be pushed to disk.
+     * @throws IOException
+     */
+    void sync() throws IOException;
+    
+    /**
+     * Deletes any records that belong to the given container/section/journal
+     * @param containerName
+     * @param section
+     * @param journalId
+     * @throws IOException
+     */
+    void delete(String containerName, String section, String journalId) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexedFieldNames.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexedFieldNames.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexedFieldNames.java
new file mode 100644
index 0000000..977df9f
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexedFieldNames.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+public class IndexedFieldNames {
+
+    public static final String CONTAINER_NAME = "containerName";
+    public static final String SECTION_NAME = "sectionName";
+    public static final String JOURNAL_ID = "journalId";
+    public static final String BLOCK_INDEX = "blockIndex";
+    public static final String EVENT_ID = "eventId";
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/JournalingSearchableFields.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/JournalingSearchableFields.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/JournalingSearchableFields.java
new file mode 100644
index 0000000..9ec9f5d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/JournalingSearchableFields.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import org.apache.nifi.provenance.NamedSearchableField;
+import org.apache.nifi.provenance.search.SearchableField;
+
+public class JournalingSearchableFields {
+    public static SearchableField CONTAINER_NAME = new 
NamedSearchableField(IndexedFieldNames.CONTAINER_NAME, 
IndexedFieldNames.CONTAINER_NAME, "Container Name", false);
+    public static SearchableField SECTION_NAME = new 
NamedSearchableField(IndexedFieldNames.SECTION_NAME, 
IndexedFieldNames.SECTION_NAME, "Section Name", false);
+    public static SearchableField JOURNAL_ID = new 
NamedSearchableField(IndexedFieldNames.JOURNAL_ID, 
IndexedFieldNames.JOURNAL_ID, "Journal ID", false);
+    public static SearchableField BLOCK_INDEX = new 
NamedSearchableField(IndexedFieldNames.BLOCK_INDEX, 
IndexedFieldNames.BLOCK_INDEX, "Block Index", false);
+    public static SearchableField EVENT_ID = new 
NamedSearchableField(IndexedFieldNames.EVENT_ID, IndexedFieldNames.EVENT_ID, 
"Event ID", false);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
new file mode 100644
index 0000000..32dc7c3
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.SortField.Type;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.search.Query;
+
+public class LuceneIndexSearcher implements EventIndexSearcher {
+    private final DirectoryReader reader;
+    private final IndexSearcher searcher;
+    private final FSDirectory fsDirectory;
+    
+    public LuceneIndexSearcher(final File indexDirectory) throws IOException {
+        this.fsDirectory = FSDirectory.open(indexDirectory);
+        this.reader = DirectoryReader.open(fsDirectory);
+        this.searcher = new IndexSearcher(reader);
+    }
+    
+    public LuceneIndexSearcher(final DirectoryReader reader) {
+        this.reader = reader;
+        this.searcher = new IndexSearcher(reader);
+        this.fsDirectory = null;
+    }
+    
+    @Override
+    public void close() throws IOException {
+        IOException suppressed = null;
+        try {
+            reader.close();
+        } catch (final IOException ioe) {
+            suppressed = ioe;
+        }
+        
+        if ( fsDirectory != null ) {
+            fsDirectory.close();
+        }
+        
+        if ( suppressed != null ) {
+            throw suppressed;
+        }
+    }
+    
+    private JournaledStorageLocation createLocation(final Document document) {
+        final String containerName = 
document.get(IndexedFieldNames.CONTAINER_NAME);
+        final String sectionName = 
document.get(IndexedFieldNames.SECTION_NAME);
+        final String journalId = document.get(IndexedFieldNames.JOURNAL_ID);
+        final int blockIndex = 
document.getField(IndexedFieldNames.BLOCK_INDEX).numericValue().intValue();
+        final long eventId = 
document.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue();
+        
+        return new JournaledStorageLocation(containerName, sectionName, 
journalId, blockIndex, eventId);
+    }
+    
+    private List<JournaledStorageLocation> getLocations(final TopDocs topDocs) 
throws IOException {
+        final ScoreDoc[] scoreDocs = topDocs.scoreDocs;
+        final List<JournaledStorageLocation> locations = new 
ArrayList<>(scoreDocs.length);
+        for ( final ScoreDoc scoreDoc : scoreDocs ) {
+            final Document document = reader.document(scoreDoc.doc);
+            locations.add(createLocation(document));
+        }
+        
+        return locations;
+    }
+    
+    @Override
+    public SearchResult search(final Query provenanceQuery) throws IOException 
{
+        final org.apache.lucene.search.Query luceneQuery = 
QueryUtils.convertQueryToLucene(provenanceQuery);
+        final TopDocs topDocs = searcher.search(luceneQuery, 
provenanceQuery.getMaxResults());
+        final List<JournaledStorageLocation> locations = getLocations(topDocs);
+        
+        return new SearchResult(locations, topDocs.totalHits);
+    }
+
+    @Override
+    public List<JournaledStorageLocation> getEvents(final long minEventId, 
final int maxResults) throws IOException {
+        final BooleanQuery query = new BooleanQuery();
+        query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.EVENT_ID, 
minEventId, null, true, true), Occur.MUST);
+        
+        final TopDocs topDocs = searcher.search(query, maxResults, new 
Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG)));
+        return getLocations(topDocs);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
new file mode 100644
index 0000000..e955ae5
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.LongField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LuceneIndexWriter implements EventIndexWriter {
+    private static final Logger logger = 
LoggerFactory.getLogger(LuceneIndexWriter.class);
+    
+    @SuppressWarnings("unused")
+    private final JournalingRepositoryConfig config;
+    private final Set<SearchableField> nonAttributeSearchableFields;
+    private final Set<SearchableField> attributeSearchableFields;
+    
+    private final Directory directory;
+    private final Analyzer analyzer;
+    private final IndexWriter indexWriter;
+    private final AtomicLong indexMaxId = new AtomicLong(-1L);
+    
+    public LuceneIndexWriter(final File indexDir, final 
JournalingRepositoryConfig config) throws IOException {
+        this.config = config;
+        
+        attributeSearchableFields = Collections.unmodifiableSet(new 
HashSet<>(config.getSearchableAttributes()));
+        nonAttributeSearchableFields = Collections.unmodifiableSet(new 
HashSet<>(config.getSearchableFields()));
+        
+        directory = FSDirectory.open(indexDir);
+        analyzer = new StandardAnalyzer();
+        final IndexWriterConfig writerConfig = new 
IndexWriterConfig(Version.LATEST, analyzer);
+        indexWriter = new IndexWriter(directory, writerConfig);
+    }
+    
+    public EventIndexSearcher newIndexSearcher() throws IOException {
+        final DirectoryReader reader = DirectoryReader.open(indexWriter, 
false);
+        return new LuceneIndexSearcher(reader);
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException suppressed = null;
+        try {
+            indexWriter.close();
+        } catch (final IOException ioe) {
+            suppressed = ioe;
+        }
+        
+        analyzer.close();
+        
+        try {
+            directory.close();
+        } catch (final IOException ioe) {
+            if ( suppressed != null ) {
+                ioe.addSuppressed(suppressed);
+            }
+            
+            throw ioe;
+        }
+    }
+    
+    
+    private void addField(final Document doc, final SearchableField field, 
final String value, final Store store) {
+        if (value == null || (!nonAttributeSearchableFields.contains(field) && 
!field.isAttribute())) {
+            return;
+        }
+
+        doc.add(new StringField(field.getSearchableFieldName(), 
value.toLowerCase(), store));
+    }
+    
+    @Override
+    public void index(final Collection<JournaledProvenanceEvent> events) 
throws IOException {
+        long maxId = this.indexMaxId.get();
+        
+        for ( final JournaledProvenanceEvent event : events ) {
+            maxId = event.getEventId();
+
+            final Map<String, String> attributes = event.getAttributes();
+
+            final Document doc = new Document();
+            addField(doc, SearchableFields.FlowFileUUID, 
event.getFlowFileUuid(), Store.NO);
+            addField(doc, SearchableFields.Filename, 
attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
+            addField(doc, SearchableFields.ComponentID, 
event.getComponentId(), Store.NO);
+            addField(doc, SearchableFields.AlternateIdentifierURI, 
event.getAlternateIdentifierUri(), Store.NO);
+            addField(doc, SearchableFields.EventType, 
event.getEventType().name(), Store.NO);
+            addField(doc, SearchableFields.Relationship, 
event.getRelationship(), Store.NO);
+            addField(doc, SearchableFields.Details, event.getDetails(), 
Store.NO);
+            addField(doc, SearchableFields.ContentClaimSection, 
event.getContentClaimSection(), Store.NO);
+            addField(doc, SearchableFields.ContentClaimContainer, 
event.getContentClaimContainer(), Store.NO);
+            addField(doc, SearchableFields.ContentClaimIdentifier, 
event.getContentClaimIdentifier(), Store.NO);
+            addField(doc, SearchableFields.SourceQueueIdentifier, 
event.getSourceQueueIdentifier(), Store.NO);
+
+            if 
(nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
+                addField(doc, SearchableFields.TransitURI, 
event.getTransitUri(), Store.NO);
+            }
+
+            for (final SearchableField searchableField : 
attributeSearchableFields) {
+                addField(doc, searchableField, 
attributes.get(searchableField.getSearchableFieldName()), Store.NO);
+            }
+
+            // Index the fields that we always index (unless there's nothing 
else to index at all)
+            doc.add(new 
LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), 
event.getLineageStartDate(), Store.NO));
+            doc.add(new 
LongField(SearchableFields.EventTime.getSearchableFieldName(), 
event.getEventTime(), Store.NO));
+            doc.add(new 
LongField(SearchableFields.FileSize.getSearchableFieldName(), 
event.getFileSize(), Store.NO));
+
+            final JournaledStorageLocation location = 
event.getStorageLocation();
+            doc.add(new StringField(IndexedFieldNames.CONTAINER_NAME, 
location.getContainerName(), Store.YES));
+            doc.add(new StringField(IndexedFieldNames.SECTION_NAME, 
location.getSectionName(), Store.YES));
+            doc.add(new StringField(IndexedFieldNames.JOURNAL_ID, 
location.getJournalId(), Store.YES));
+            doc.add(new LongField(IndexedFieldNames.BLOCK_INDEX, 
location.getBlockIndex(), Store.YES));
+            doc.add(new LongField(IndexedFieldNames.EVENT_ID, 
location.getEventId(), Store.YES));
+
+            for (final String lineageIdentifier : 
event.getLineageIdentifiers()) {
+                addField(doc, SearchableFields.LineageIdentifier, 
lineageIdentifier, Store.NO);
+            }
+
+            // If it's event is a FORK, or JOIN, add the FlowFileUUID for all 
child/parent UUIDs.
+            if (event.getEventType() == ProvenanceEventType.FORK || 
event.getEventType() == ProvenanceEventType.CLONE || event.getEventType() == 
ProvenanceEventType.REPLAY) {
+                for (final String uuid : event.getChildUuids()) {
+                    if (!uuid.equals(event.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, 
Store.NO);
+                    }
+                }
+            } else if (event.getEventType() == ProvenanceEventType.JOIN) {
+                for (final String uuid : event.getParentUuids()) {
+                    if (!uuid.equals(event.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, 
Store.NO);
+                    }
+                }
+            } else if (event.getEventType() == ProvenanceEventType.RECEIVE && 
event.getSourceSystemFlowFileIdentifier() != null) {
+                // If we get a receive with a Source System FlowFile 
Identifier, we add another Document that shows the UUID
+                // that the Source System uses to refer to the data.
+                final String sourceIdentifier = 
event.getSourceSystemFlowFileIdentifier();
+                final String sourceFlowFileUUID;
+                final int lastColon = sourceIdentifier.lastIndexOf(":");
+                if (lastColon > -1 && lastColon < sourceIdentifier.length() - 
2) {
+                    sourceFlowFileUUID = sourceIdentifier.substring(lastColon 
+ 1);
+                } else {
+                    sourceFlowFileUUID = null;
+                }
+
+                if (sourceFlowFileUUID != null) {
+                    addField(doc, SearchableFields.FlowFileUUID, 
sourceFlowFileUUID, Store.NO);
+                }
+            }
+
+            indexWriter.addDocument(doc);
+        }
+
+        // Update the index's max id
+        boolean updated = false;
+        do {
+            long curMax = indexMaxId.get();
+            if ( maxId > curMax ) {
+                updated = indexMaxId.compareAndSet(curMax, maxId);
+            } else {
+                updated = true;
+            }
+        } while (!updated);
+    }
+    
+    
+    @Override
+    public void delete(final String containerName, final String section, final 
String journalId) throws IOException {
+        final BooleanQuery query = new BooleanQuery();
+        query.add(new BooleanClause(new TermQuery(new 
Term(IndexedFieldNames.CONTAINER_NAME, containerName)), Occur.MUST));
+        query.add(new BooleanClause(new TermQuery(new 
Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST));
+        query.add(new BooleanClause(new TermQuery(new 
Term(IndexedFieldNames.JOURNAL_ID, journalId)), Occur.MUST));
+        
+        indexWriter.deleteDocuments(query);
+    }
+    
+    
+    @Override
+    public void sync() throws IOException {
+        indexWriter.commit();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
new file mode 100644
index 0000000..4ae4b16
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.WildcardQuery;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StorageLocation;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.search.SearchTerm;
+
+public class QueryUtils {
+    public static org.apache.lucene.search.Query convertQueryToLucene(final 
org.apache.nifi.provenance.search.Query query) {
+        if (query.getStartDate() == null && query.getEndDate() == null && 
query.getSearchTerms().isEmpty()) {
+            return new MatchAllDocsQuery();
+        }
+
+        final BooleanQuery luceneQuery = new BooleanQuery();
+        for (final SearchTerm searchTerm : query.getSearchTerms()) {
+            final String searchValue = searchTerm.getValue();
+            if (searchValue == null) {
+                throw new IllegalArgumentException("Empty search value not 
allowed (for term '" + searchTerm.getSearchableField().getFriendlyName() + 
"')");
+            }
+
+            if (searchValue.contains("*") || searchValue.contains("?")) {
+                luceneQuery.add(new BooleanClause(new WildcardQuery(new 
Term(searchTerm.getSearchableField().getSearchableFieldName(), 
searchTerm.getValue().toLowerCase())), Occur.MUST));
+            } else {
+                luceneQuery.add(new BooleanClause(new TermQuery(new 
Term(searchTerm.getSearchableField().getSearchableFieldName(), 
searchTerm.getValue().toLowerCase())), Occur.MUST));
+            }
+        }
+
+        final Long minBytes = query.getMinFileSize() == null ? null : 
DataUnit.parseDataSize(query.getMinFileSize(), DataUnit.B).longValue();
+        final Long maxBytes = query.getMaxFileSize() == null ? null : 
DataUnit.parseDataSize(query.getMaxFileSize(), DataUnit.B).longValue();
+        if (minBytes != null || maxBytes != null) {
+            
luceneQuery.add(NumericRangeQuery.newLongRange(SearchableFields.FileSize.getSearchableFieldName(),
 minBytes, maxBytes, true, true), Occur.MUST);
+        }
+
+        final Long minDateTime = query.getStartDate() == null ? null : 
query.getStartDate().getTime();
+        final Long maxDateTime = query.getEndDate() == null ? null : 
query.getEndDate().getTime();
+        if (maxDateTime != null || minDateTime != null) {
+            
luceneQuery.add(NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(),
 minDateTime, maxDateTime, true, true), Occur.MUST);
+        }
+
+        return luceneQuery;
+    }
+    
+    
+    private static File getJournalFile(final JournaledStorageLocation 
location, final JournalingRepositoryConfig config) throws FileNotFoundException 
{
+        final File containerDir = 
config.getContainers().get(location.getContainerName());
+        if ( containerDir == null ) {
+            throw new FileNotFoundException("Could not find Container with 
name " + location.getContainerName());
+        }
+        
+        final String sectionName = location.getSectionName();
+        final File sectionFile = new File(containerDir, sectionName);
+        final File journalDir = new File(sectionFile, "journals");
+        final File journalFile = new File(journalDir, location.getJournalId() 
+ ".journal");
+        
+        return journalFile;
+    }
+    
+    
+    public static Map<File, List<JournaledStorageLocation>> 
orderLocations(final List<StorageLocation> locations, final 
JournalingRepositoryConfig config) throws FileNotFoundException, IOException {
+        final Map<File, List<JournaledStorageLocation>> map = new HashMap<>();
+        
+        for ( final StorageLocation location : locations ) {
+            if ( !(location instanceof JournaledStorageLocation) ) {
+                throw new IllegalArgumentException(location + " is not a valid 
StorageLocation for this repository");
+            }
+            
+            final JournaledStorageLocation journaledLocation = 
(JournaledStorageLocation) location;
+            final File journalFile = getJournalFile(journaledLocation, config);
+            List<JournaledStorageLocation> list = map.get(journalFile);
+            if ( list == null ) {
+                list = new ArrayList<>();
+                map.put(journalFile, list);
+            }
+            
+            list.add(journaledLocation);
+        }
+        
+        for ( final List<JournaledStorageLocation> list : map.values() ) {
+            Collections.sort(list);
+        }
+        
+        return map;
+    }
+    
+    public static File getTocFile(final File journalFile) {
+        return new File(journalFile.getParentFile(), journalFile.getName() + 
".toc");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/SearchResult.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/SearchResult.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/SearchResult.java
new file mode 100644
index 0000000..ac82438
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/SearchResult.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.util.List;
+
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+
+public class SearchResult {
+    private final int totalCount;
+    private final List<JournaledStorageLocation> locations;
+    
+    public SearchResult(final List<JournaledStorageLocation> locations, final 
int totalCount) {
+        this.totalCount = totalCount;
+        this.locations = locations;
+    }
+
+    public int getTotalCount() {
+        return totalCount;
+    }
+
+    public List<JournaledStorageLocation> getLocations() {
+        return locations;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializer.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializer.java
new file mode 100644
index 0000000..67b1cb6
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface Deserializer {
+
+    String getCodecName();
+    
+    ProvenanceEventRecord deserialize(DataInputStream in, int 
serializationVersion) throws IOException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializers.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializers.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializers.java
new file mode 100644
index 0000000..4be87e3
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Deserializers.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.io;
+
+public class Deserializers {
+    
+    public static Deserializer getDeserializer(final String codecName) {
+        switch (codecName) {
+            case StandardEventDeserializer.CODEC_NAME:
+                return new StandardEventDeserializer();
+            default:
+                throw new IllegalArgumentException("Unknown Provenance 
Serialization Codec: " + codecName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Serializer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Serializer.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Serializer.java
new file mode 100644
index 0000000..8219b4c
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/Serializer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface Serializer {
+
+    /**
+     * Returns the serialization version that is used to serialize records
+     * @return
+     */
+    int getVersion();
+    
+    /**
+     * Returns the name of the codec used to serialize the records
+     * @return
+     */
+    String getCodecName();
+    
+    /**
+     * Serializes the given even to the given DataOutputStream.
+     * This method should NOT serialize the ID, as the ID is not yet known. 
The ID will instead by
+     * serialized to the stream appropriately by the JournalWriter.
+     * 
+     * @param event
+     * @param out
+     * @throws IOException
+     */
+    void serialize(ProvenanceEventRecord event, DataOutputStream out) throws 
IOException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a68bef62/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventDeserializer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventDeserializer.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventDeserializer.java
new file mode 100644
index 0000000..fb537ee
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/io/StandardEventDeserializer.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.journaling.IdEnrichedProvenanceEvent;
+import org.apache.nifi.stream.io.StreamUtils;
+
+public class StandardEventDeserializer implements Deserializer {
+    public static final String CODEC_NAME = StandardEventSerializer.CODEC_NAME;
+    
+    @Override
+    public String getCodecName() {
+        return CODEC_NAME;
+    }
+    
+    @Override
+    public ProvenanceEventRecord deserialize(final DataInputStream in, final 
int serializationVersion) throws IOException {
+        final StandardProvenanceEventRecord.Builder builder = new 
StandardProvenanceEventRecord.Builder();
+
+        final long eventId = in.readLong();
+        final String eventTypeName = in.readUTF();
+        final ProvenanceEventType eventType = 
ProvenanceEventType.valueOf(eventTypeName);
+        builder.setEventType(eventType);
+        builder.setEventTime(in.readLong());
+
+        final Long flowFileEntryDate = in.readLong();
+        builder.setEventDuration(in.readLong());
+
+        final Set<String> lineageIdentifiers = new HashSet<>();
+        final int numLineageIdentifiers = in.readInt();
+        for (int i = 0; i < numLineageIdentifiers; i++) {
+            lineageIdentifiers.add(readUUID(in));
+        }
+
+        final long lineageStartDate = in.readLong();
+
+        builder.setComponentId(readNullableString(in));
+        builder.setComponentType(readNullableString(in));
+
+        final String uuid = readUUID(in);
+        builder.setFlowFileUUID(uuid);
+        builder.setDetails(readNullableString(in));
+
+        // Read in the FlowFile Attributes
+        final Map<String, String> previousAttrs = readAttributes(in, false);
+        final Map<String, String> attrUpdates = readAttributes(in, true);
+        builder.setAttributes(previousAttrs, attrUpdates);
+
+        final boolean hasContentClaim = in.readBoolean();
+        if (hasContentClaim) {
+            builder.setCurrentContentClaim(in.readUTF(), in.readUTF(), 
in.readUTF(), in.readLong(), in.readLong());
+        } else {
+            builder.setCurrentContentClaim(null, null, null, null, 0L);
+        }
+
+        final boolean hasPreviousClaim = in.readBoolean();
+        if (hasPreviousClaim) {
+            builder.setPreviousContentClaim(in.readUTF(), in.readUTF(), 
in.readUTF(), in.readLong(), in.readLong());
+        }
+
+        builder.setSourceQueueIdentifier(readNullableString(in));
+
+        // Read Event-Type specific fields.
+        if (eventType == ProvenanceEventType.FORK || eventType == 
ProvenanceEventType.JOIN || eventType == ProvenanceEventType.CLONE || eventType 
== ProvenanceEventType.REPLAY) {
+            final int numParents = in.readInt();
+            for (int i = 0; i < numParents; i++) {
+                builder.addParentUuid(readUUID(in));
+            }
+
+            final int numChildren = in.readInt();
+            for (int i = 0; i < numChildren; i++) {
+                builder.addChildUuid(readUUID(in));
+            }
+        } else if (eventType == ProvenanceEventType.RECEIVE) {
+            builder.setTransitUri(readNullableString(in));
+            builder.setSourceSystemFlowFileIdentifier(readNullableString(in));
+        } else if (eventType == ProvenanceEventType.SEND) {
+            builder.setTransitUri(readNullableString(in));
+        } else if (eventType == ProvenanceEventType.ADDINFO) {
+            builder.setAlternateIdentifierUri(readNullableString(in));
+        } else if (eventType == ProvenanceEventType.ROUTE) {
+            builder.setRelationship(readNullableString(in));
+        }
+
+        builder.setFlowFileEntryDate(flowFileEntryDate);
+        builder.setLineageIdentifiers(lineageIdentifiers);
+        builder.setLineageStartDate(lineageStartDate);
+        final ProvenanceEventRecord event = builder.build();
+        
+        return new IdEnrichedProvenanceEvent(event, eventId);
+    }
+
+    
+    private static Map<String, String> readAttributes(final DataInputStream 
dis, final boolean valueNullable) throws IOException {
+        final int numAttributes = dis.readInt();
+        final Map<String, String> attrs = new HashMap<>();
+        for (int i = 0; i < numAttributes; i++) {
+            final String key = readLongString(dis);
+            final String value = valueNullable ? readLongNullableString(dis) : 
readLongString(dis);
+            attrs.put(key, value);
+        }
+
+        return attrs;
+    }
+
+    private static String readUUID(final DataInputStream in) throws 
IOException {
+        final long msb = in.readLong();
+        final long lsb = in.readLong();
+        return new UUID(msb, lsb).toString();
+    }
+
+    private static String readNullableString(final DataInputStream in) throws 
IOException {
+        final boolean valueExists = in.readBoolean();
+        if (valueExists) {
+            return in.readUTF();
+        } else {
+            return null;
+        }
+    }
+
+    private static String readLongNullableString(final DataInputStream in) 
throws IOException {
+        final boolean valueExists = in.readBoolean();
+        if (valueExists) {
+            return readLongString(in);
+        } else {
+            return null;
+        }
+    }
+
+    private static String readLongString(final DataInputStream in) throws 
IOException {
+        final int length = in.readInt();
+        final byte[] strBytes = new byte[length];
+        StreamUtils.fillBuffer(in, strBytes);
+        return new String(strBytes, StandardCharsets.UTF_8);
+    }
+}

Reply via email to