http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
new file mode 100644
index 0000000..eccff2a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.repository.schema.FieldMapRecord;
+import org.apache.nifi.repository.schema.NamedValue;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class LookupTableEventRecord implements Record {
+    private final RecordSchema schema;
+    private final ProvenanceEventRecord event;
+    private final long eventId;
+    private final Record contentClaimRecord;
+    private final Record previousClaimRecord;
+
+    private final long eventIdStartOffset;
+    private final long startTimeOffset;
+    private final Map<String, Integer> componentIdMap;
+    private final Map<String, Integer> componentTypeMap;
+    private final Map<String, Integer> queueIdMap;
+    private final Map<String, Integer> eventTypeMap;
+
+    public LookupTableEventRecord(final ProvenanceEventRecord event, final 
long eventId, final RecordSchema schema, final RecordSchema contentClaimSchema,
+        final RecordSchema previousContentClaimSchema, final long 
eventIdStartOffset, final long startTimeOffset, final Map<String, Integer> 
componentIdMap,
+        final Map<String, Integer> componentTypeMap, final Map<String, 
Integer> queueIdMap, final Map<String, Integer> eventTypeMap) {
+        this.schema = schema;
+        this.event = event;
+        this.eventId = eventId;
+        this.previousClaimRecord = 
createPreviousContentClaimRecord(previousContentClaimSchema, 
event.getPreviousContentClaimContainer(), 
event.getPreviousContentClaimSection(),
+            event.getPreviousContentClaimIdentifier(), 
event.getPreviousContentClaimOffset(), event.getPreviousFileSize());
+        this.contentClaimRecord = createContentClaimRecord(contentClaimSchema, 
event.getContentClaimContainer(), event.getContentClaimSection(),
+            event.getContentClaimIdentifier(), event.getContentClaimOffset(), 
event.getFileSize());
+
+        this.eventIdStartOffset = eventIdStartOffset;
+        this.startTimeOffset = startTimeOffset;
+        this.componentIdMap = componentIdMap;
+        this.componentTypeMap = componentTypeMap;
+        this.queueIdMap = queueIdMap;
+        this.eventTypeMap = eventTypeMap;
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+
+    private static Record createPreviousContentClaimRecord(final RecordSchema 
contentClaimSchema, final String container, final String section,
+        final String identifier, final Long offset, final Long size) {
+
+        if (container == null || section == null || identifier == null) {
+            return null;
+        }
+
+        final Map<RecordField, Object> fieldValues = new HashMap<>();
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_CONTAINER, container);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_SECTION, section);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_IDENTIFIER, 
identifier);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_OFFSET, offset);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_SIZE, size);
+        return new FieldMapRecord(fieldValues, contentClaimSchema);
+
+    }
+
+    private static Record createContentClaimRecord(final RecordSchema 
contentClaimSchema, final String container, final String section,
+            final String identifier, final Long offset, final Long size) {
+
+        if (container == null || section == null || identifier == null) {
+            final Map<RecordField, Object> lookupValues = 
Collections.singletonMap(LookupTableEventRecordFields.NO_VALUE, 
EventFieldNames.NO_VALUE);
+            final List<RecordField> noValueFields = 
Collections.singletonList(contentClaimSchema.getField(EventFieldNames.NO_VALUE));
+            return new FieldMapRecord(lookupValues, new 
RecordSchema(noValueFields));
+        }
+
+        final Map<RecordField, Object> fieldValues = new HashMap<>();
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_CONTAINER, container);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_SECTION, section);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_IDENTIFIER, 
identifier);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_OFFSET, offset);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_SIZE, size);
+
+        final List<RecordField> explicitClaimFields = 
contentClaimSchema.getField(EventFieldNames.EXPLICIT_VALUE).getSubFields();
+        final Record explicitClaimRecord = new FieldMapRecord(fieldValues, new 
RecordSchema(explicitClaimFields));
+        return explicitClaimRecord;
+    }
+
+    private static String readLookupValue(final Object recordValue, final 
List<String> lookup) {
+        if (recordValue == null) {
+            return null;
+        }
+
+        // NO_VALUE type
+        if (recordValue instanceof Boolean) {
+            return null;
+        }
+
+        // LOOKUP type
+        if (recordValue instanceof Integer) {
+            final Integer indexValue = (Integer) recordValue;
+            final int index = indexValue.intValue();
+            if (index > lookup.size() - 1) {
+                return null;
+            }
+
+            return lookup.get(index);
+        }
+
+        // EXPLICIT_VALUE type
+        if (recordValue instanceof String) {
+            return (String) recordValue;
+        }
+
+        return null;
+    }
+
+    private NamedValue createLookupValue(final String literalValue, final 
Map<String, Integer> lookup) {
+        if (literalValue == null) {
+            final Map<RecordField, Object> lookupValues = 
Collections.singletonMap(LookupTableEventRecordFields.NO_VALUE, 
EventFieldNames.NO_VALUE);
+            final Record record = new FieldMapRecord(lookupValues, 
LookupTableEventSchema.NO_VALUE_SCHEMA);
+            final NamedValue namedValue = new 
NamedValue(EventFieldNames.NO_VALUE, record);
+            return namedValue;
+        }
+
+        final Integer index = lookup.get(literalValue);
+        if (index == null) {
+            final Map<RecordField, Object> lookupValues = 
Collections.singletonMap(LookupTableEventRecordFields.EXPLICIT_STRING, 
literalValue);
+            final Record record = new FieldMapRecord(lookupValues, 
LookupTableEventSchema.EXPLICIT_STRING_SCHEMA);
+            final NamedValue namedValue = new 
NamedValue(EventFieldNames.EXPLICIT_VALUE, record);
+            return namedValue;
+        } else {
+            final Map<RecordField, Object> lookupValues = 
Collections.singletonMap(LookupTableEventRecordFields.LOOKUP_VALUE, index);
+            final Record record = new FieldMapRecord(lookupValues, 
LookupTableEventSchema.LOOKUP_VALUE_SCHEMA);
+            final NamedValue namedValue = new 
NamedValue(EventFieldNames.LOOKUP_VALUE, record);
+            return namedValue;
+        }
+    }
+
+    private NamedValue createExplicitSameOrNoneValue(final Record newValue, 
final Record oldValue, final Supplier<Record> recordSupplier) {
+        if (newValue == null || 
EventFieldNames.NO_VALUE.equals(newValue.getSchema().getFields().get(0).getFieldName()))
 {
+            final Map<RecordField, Object> lookupValues = 
Collections.singletonMap(LookupTableEventRecordFields.NO_VALUE, 
EventFieldNames.NO_VALUE);
+            final Record record = new FieldMapRecord(lookupValues, 
LookupTableEventSchema.NO_VALUE_SCHEMA);
+            final NamedValue namedValue = new 
NamedValue(EventFieldNames.NO_VALUE, record);
+            return namedValue;
+        } else if (newValue.equals(oldValue)) {
+            final Map<RecordField, Object> lookupValues = 
Collections.singletonMap(LookupTableEventRecordFields.UNCHANGED_VALUE, 
EventFieldNames.UNCHANGED_VALUE);
+            final Record record = new FieldMapRecord(lookupValues, 
LookupTableEventSchema.UNCHANGED_VALUE_SCHEMA);
+            final NamedValue namedValue = new 
NamedValue(EventFieldNames.UNCHANGED_VALUE, record);
+            return namedValue;
+        }
+
+        final Record record = recordSupplier.get();
+        final NamedValue namedValue = new 
NamedValue(EventFieldNames.EXPLICIT_VALUE, record);
+        return namedValue;
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case EventFieldNames.EVENT_IDENTIFIER:
+                return (int) (eventId - eventIdStartOffset);
+            case EventFieldNames.ALTERNATE_IDENTIFIER:
+                return event.getAlternateIdentifierUri();
+            case EventFieldNames.CHILD_UUIDS:
+                return event.getChildUuids();
+            case EventFieldNames.COMPONENT_ID:
+                return createLookupValue(event.getComponentId(), 
componentIdMap);
+            case EventFieldNames.COMPONENT_TYPE:
+                return createLookupValue(event.getComponentType(), 
componentTypeMap);
+            case EventFieldNames.CONTENT_CLAIM:
+                return createExplicitSameOrNoneValue(contentClaimRecord, 
previousClaimRecord, () -> contentClaimRecord);
+            case EventFieldNames.EVENT_DETAILS:
+                return event.getDetails();
+            case EventFieldNames.EVENT_DURATION:
+                return (int) event.getEventDuration();
+            case EventFieldNames.EVENT_TIME:
+                return (int) (event.getEventTime() - startTimeOffset);
+            case EventFieldNames.EVENT_TYPE:
+                return eventTypeMap.get(event.getEventType().name());
+            case EventFieldNames.FLOWFILE_ENTRY_DATE:
+                return (int) (event.getFlowFileEntryDate() - startTimeOffset);
+            case EventFieldNames.LINEAGE_START_DATE:
+                return (int) (event.getLineageStartDate() - startTimeOffset);
+            case EventFieldNames.PARENT_UUIDS:
+                return event.getParentUuids();
+            case EventFieldNames.PREVIOUS_ATTRIBUTES:
+                return event.getPreviousAttributes();
+            case EventFieldNames.PREVIOUS_CONTENT_CLAIM:
+                return previousClaimRecord;
+            case EventFieldNames.RELATIONSHIP:
+                return event.getRelationship();
+            case EventFieldNames.SOURCE_QUEUE_IDENTIFIER:
+                return createLookupValue(event.getSourceQueueIdentifier(), 
queueIdMap);
+            case EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER:
+                return event.getSourceSystemFlowFileIdentifier();
+            case EventFieldNames.TRANSIT_URI:
+                return event.getTransitUri();
+            case EventFieldNames.UPDATED_ATTRIBUTES:
+                return event.getUpdatedAttributes();
+            case EventFieldNames.FLOWFILE_UUID:
+                return event.getAttribute(CoreAttributes.UUID.key());
+        }
+
+        return null;
+    }
+
+    private static Long addLong(final Integer optionalValue, final long 
requiredValue) {
+        if (optionalValue == null) {
+            return null;
+        }
+
+        return optionalValue.longValue() + requiredValue;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static StandardProvenanceEventRecord getEvent(final Record record, 
final String storageFilename, final long storageByteOffset, final int 
maxAttributeLength,
+        final long eventIdStartOffset, final long startTimeOffset, final 
List<String> componentIds, final List<String> componentTypes,
+        final List<String> queueIds, final List<String> eventTypes) {
+
+        final Map<String, String> previousAttributes = 
truncateAttributes((Map<String, String>) 
record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength);
+        final Map<String, String> updatedAttributes = 
truncateAttributes((Map<String, String>) 
record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength);
+
+        final StandardProvenanceEventRecord.Builder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setAlternateIdentifierUri((String) 
record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER));
+        builder.setChildUuids((List<String>) 
record.getFieldValue(EventFieldNames.CHILD_UUIDS));
+        builder.setDetails((String) 
record.getFieldValue(EventFieldNames.EVENT_DETAILS));
+        builder.setParentUuids((List<String>) 
record.getFieldValue(EventFieldNames.PARENT_UUIDS));
+        builder.setPreviousAttributes(previousAttributes);
+        builder.setRelationship((String) 
record.getFieldValue(EventFieldNames.RELATIONSHIP));
+        builder.setSourceSystemFlowFileIdentifier((String) 
record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
+        builder.setTransitUri((String) 
record.getFieldValue(EventFieldNames.TRANSIT_URI));
+        builder.setUpdatedAttributes(updatedAttributes);
+
+
+        
builder.setComponentId(readLookupValue(record.getFieldValue(EventFieldNames.COMPONENT_ID),
 componentIds));
+        
builder.setComponentType(readLookupValue(record.getFieldValue(EventFieldNames.COMPONENT_TYPE),
 componentTypes));
+        
builder.setSourceQueueIdentifier(readLookupValue(record.getFieldValue(EventFieldNames.SOURCE_QUEUE_IDENTIFIER),
 queueIds));
+
+        // Determine the event type
+        final Integer eventTypeOrdinal = (Integer) 
record.getFieldValue(EventFieldNames.EVENT_TYPE);
+        if (eventTypeOrdinal == null || eventTypeOrdinal > eventTypes.size() 
|| eventTypeOrdinal < 0) {
+            builder.setEventType(ProvenanceEventType.UNKNOWN);
+        } else {
+            try {
+                
builder.setEventType(ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal)));
+            } catch (final Exception e) {
+                builder.setEventType(ProvenanceEventType.UNKNOWN);
+            }
+        }
+
+        String uuid = updatedAttributes == null ? null : 
updatedAttributes.get(CoreAttributes.UUID.key());
+        if (uuid == null) {
+            uuid = previousAttributes == null ? null : 
previousAttributes.get(CoreAttributes.UUID.key());
+        }
+        builder.setFlowFileUUID(uuid);
+
+        builder.setEventDuration((Integer) 
record.getFieldValue(EventFieldNames.EVENT_DURATION));
+        builder.setEventTime(addLong((Integer) 
record.getFieldValue(EventFieldNames.EVENT_TIME), startTimeOffset));
+        builder.setFlowFileEntryDate(addLong((Integer) 
record.getFieldValue(EventFieldNames.FLOWFILE_ENTRY_DATE), startTimeOffset));
+        builder.setLineageStartDate(addLong((Integer) 
record.getFieldValue(EventFieldNames.LINEAGE_START_DATE), startTimeOffset));
+
+        final Integer eventId = (Integer) 
record.getFieldValue(EventFieldNames.EVENT_IDENTIFIER);
+        if (eventId != null) {
+            builder.setEventId(eventId.longValue() + eventIdStartOffset);
+        }
+
+        builder.setStorageLocation(storageFilename, storageByteOffset);
+
+        final Record previousClaimRecord = (Record) 
record.getFieldValue(EventFieldNames.PREVIOUS_CONTENT_CLAIM);
+        if (previousClaimRecord != null) {
+            builder.setPreviousContentClaim(
+                (String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_CONTAINER),
+                (String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SECTION),
+                (String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_IDENTIFIER),
+                (Long) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_OFFSET),
+                (Long) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SIZE));
+        }
+
+        final Object contentClaimObject = 
record.getFieldValue(EventFieldNames.CONTENT_CLAIM);
+
+        // NO_VALUE type
+        builder.setCurrentContentClaim(null, null, null, null, 0L);
+        if (contentClaimObject != null) {
+            if (contentClaimObject instanceof String) {
+                final String contentClaimDescription = (String) 
contentClaimObject;
+                switch (contentClaimDescription) {
+                    case EventFieldNames.UNCHANGED_VALUE:
+                        builder.setCurrentContentClaim((String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_CONTAINER),
+                            (String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SECTION),
+                            (String) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_IDENTIFIER),
+                            (Long) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_OFFSET),
+                            (Long) 
previousClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SIZE));
+                        break;
+                }
+            } else if (contentClaimObject instanceof Record) {
+                final Record currentClaimRecord = (Record) contentClaimObject;
+                builder.setCurrentContentClaim(
+                    (String) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_CONTAINER),
+                    (String) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SECTION),
+                    (String) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_IDENTIFIER),
+                    (Long) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_OFFSET),
+                    (Long) 
currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SIZE));
+            }
+        }
+
+        return builder.build();
+    }
+
+    private static Map<String, String> truncateAttributes(final Map<String, 
String> attributes, final int maxAttributeLength) {
+        if (attributes == null) {
+            return null;
+        }
+
+        // Check if any attribute value exceeds the attribute length
+        final boolean anyExceedsLength = attributes.values().stream()
+            .filter(value -> value != null)
+            .anyMatch(value -> value.length() > maxAttributeLength);
+
+        if (!anyExceedsLength) {
+            return attributes;
+        }
+
+        final Map<String, String> truncated = new HashMap<>();
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+            final String key = entry.getKey();
+            final String value = entry.getValue();
+
+            if (value == null || value.length() <= maxAttributeLength) {
+                truncated.put(key, value);
+                continue;
+            }
+
+            truncated.put(key, value.substring(0, maxAttributeLength));
+        }
+
+        return truncated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
new file mode 100644
index 0000000..7b33ded
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import static org.apache.nifi.repository.schema.Repetition.EXACTLY_ONE;
+import static org.apache.nifi.repository.schema.Repetition.ZERO_OR_MORE;
+import static org.apache.nifi.repository.schema.Repetition.ZERO_OR_ONE;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.MapRecordField;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.apache.nifi.repository.schema.UnionRecordField;
+
+public class LookupTableEventRecordFields {
+
+    // General Event fields.
+    public static final RecordField RECORD_IDENTIFIER_OFFSET = new 
SimpleRecordField(EventFieldNames.EVENT_IDENTIFIER, FieldType.INT, EXACTLY_ONE);
+    public static final RecordField EVENT_TYPE_ORDINAL = new 
SimpleRecordField(EventFieldNames.EVENT_TYPE, FieldType.INT, EXACTLY_ONE);
+    public static final RecordField EVENT_TIME_OFFSET = new 
SimpleRecordField(EventFieldNames.EVENT_TIME, FieldType.INT, EXACTLY_ONE);
+    public static final RecordField FLOWFILE_ENTRY_DATE_OFFSET = new 
SimpleRecordField(EventFieldNames.FLOWFILE_ENTRY_DATE, FieldType.INT, 
EXACTLY_ONE);
+    public static final RecordField EVENT_DURATION = new 
SimpleRecordField(EventFieldNames.EVENT_DURATION, FieldType.INT, EXACTLY_ONE);
+    public static final RecordField LINEAGE_START_DATE_OFFSET = new 
SimpleRecordField(EventFieldNames.LINEAGE_START_DATE, FieldType.INT, 
EXACTLY_ONE);
+    public static final RecordField EVENT_DETAILS = new 
SimpleRecordField(EventFieldNames.EVENT_DETAILS, FieldType.STRING, ZERO_OR_ONE);
+
+    // Make lookup id or a string, depending on whether or not available in 
header.
+    public static final RecordField NO_VALUE = new 
SimpleRecordField(EventFieldNames.NO_VALUE, FieldType.STRING, 
Repetition.EXACTLY_ONE);
+    public static final RecordField EXPLICIT_STRING = new 
SimpleRecordField(EventFieldNames.EXPLICIT_VALUE, FieldType.STRING, 
Repetition.EXACTLY_ONE);
+    public static final RecordField LOOKUP_VALUE = new 
SimpleRecordField(EventFieldNames.LOOKUP_VALUE, FieldType.INT, 
Repetition.EXACTLY_ONE);
+    public static final RecordField UNCHANGED_VALUE = new 
SimpleRecordField(EventFieldNames.UNCHANGED_VALUE, FieldType.STRING, 
Repetition.EXACTLY_ONE);
+
+    public static final RecordField COMPONENT_ID = new 
UnionRecordField(EventFieldNames.COMPONENT_ID, Repetition.EXACTLY_ONE, 
NO_VALUE, EXPLICIT_STRING, LOOKUP_VALUE);
+    public static final RecordField SOURCE_QUEUE_ID = new 
UnionRecordField(EventFieldNames.SOURCE_QUEUE_IDENTIFIER, 
Repetition.EXACTLY_ONE, NO_VALUE, EXPLICIT_STRING, LOOKUP_VALUE);
+    public static final RecordField COMPONENT_TYPE = new 
UnionRecordField(EventFieldNames.COMPONENT_TYPE, Repetition.EXACTLY_ONE, 
EXPLICIT_STRING, LOOKUP_VALUE);
+
+    // Attributes
+    public static final RecordField ATTRIBUTE_NAME = new 
SimpleRecordField(EventFieldNames.ATTRIBUTE_NAME, FieldType.LONG_STRING, 
EXACTLY_ONE);
+    public static final RecordField ATTRIBUTE_VALUE_REQUIRED = new 
SimpleRecordField(EventFieldNames.ATTRIBUTE_VALUE, FieldType.LONG_STRING, 
EXACTLY_ONE);
+    public static final RecordField ATTRIBUTE_VALUE_OPTIONAL = new 
SimpleRecordField(EventFieldNames.ATTRIBUTE_VALUE, FieldType.LONG_STRING, 
ZERO_OR_ONE);
+
+    public static final RecordField PREVIOUS_ATTRIBUTES = new 
MapRecordField(EventFieldNames.PREVIOUS_ATTRIBUTES, ATTRIBUTE_NAME, 
ATTRIBUTE_VALUE_REQUIRED, EXACTLY_ONE);
+    public static final RecordField UPDATED_ATTRIBUTES = new 
MapRecordField(EventFieldNames.UPDATED_ATTRIBUTES, ATTRIBUTE_NAME, 
ATTRIBUTE_VALUE_OPTIONAL, EXACTLY_ONE);
+
+    // Content Claims
+    public static final RecordField CONTENT_CLAIM_CONTAINER = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_CONTAINER, FieldType.STRING, 
EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_SECTION = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SECTION, FieldType.STRING, 
EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_IDENTIFIER = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_IDENTIFIER, FieldType.STRING, 
EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_OFFSET = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_OFFSET, FieldType.LONG, 
EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_SIZE = new 
SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SIZE, FieldType.LONG, 
EXACTLY_ONE);
+
+    public static final RecordField PREVIOUS_CONTENT_CLAIM = new 
ComplexRecordField(EventFieldNames.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE,
+        CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, 
CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+
+    public static final RecordField CURRENT_CONTENT_CLAIM_EXPLICIT = new 
ComplexRecordField(EventFieldNames.EXPLICIT_VALUE, EXACTLY_ONE,
+        CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, 
CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+    public static final RecordField CURRENT_CONTENT_CLAIM = new 
UnionRecordField(EventFieldNames.CONTENT_CLAIM,
+        Repetition.EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, 
CURRENT_CONTENT_CLAIM_EXPLICIT);
+
+
+    // EventType-Specific fields
+    // for FORK, JOIN, CLONE, REPLAY
+    public static final RecordField PARENT_UUIDS = new 
SimpleRecordField(EventFieldNames.PARENT_UUIDS, FieldType.STRING, ZERO_OR_MORE);
+    public static final RecordField CHILD_UUIDS = new 
SimpleRecordField(EventFieldNames.CHILD_UUIDS, FieldType.STRING, ZERO_OR_MORE);
+
+    // for SEND/RECEIVE/FETCH
+    public static final RecordField TRANSIT_URI = new 
SimpleRecordField(EventFieldNames.TRANSIT_URI, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = new 
SimpleRecordField(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER, 
FieldType.STRING, ZERO_OR_ONE);
+
+    // for ADD_INFO
+    public static final RecordField ALTERNATE_IDENTIFIER = new 
SimpleRecordField(EventFieldNames.ALTERNATE_IDENTIFIER, FieldType.STRING, 
ZERO_OR_ONE);
+    public static final RecordField RELATIONSHIP = new 
SimpleRecordField(EventFieldNames.RELATIONSHIP, FieldType.STRING, ZERO_OR_ONE);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
new file mode 100644
index 0000000..7110336
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.ALTERNATE_IDENTIFIER;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.CHILD_UUIDS;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.COMPONENT_ID;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.COMPONENT_TYPE;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.CURRENT_CONTENT_CLAIM;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.EVENT_DETAILS;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.EVENT_DURATION;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.EVENT_TIME_OFFSET;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.EVENT_TYPE_ORDINAL;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.EXPLICIT_STRING;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.FLOWFILE_ENTRY_DATE_OFFSET;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.LINEAGE_START_DATE_OFFSET;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.LOOKUP_VALUE;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.NO_VALUE;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.PARENT_UUIDS;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.PREVIOUS_ATTRIBUTES;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.PREVIOUS_CONTENT_CLAIM;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.RECORD_IDENTIFIER_OFFSET;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.RELATIONSHIP;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.SOURCE_QUEUE_ID;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.TRANSIT_URI;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.UNCHANGED_VALUE;
+import static 
org.apache.nifi.provenance.schema.LookupTableEventRecordFields.UPDATED_ATTRIBUTES;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class LookupTableEventSchema {
+    public static final RecordSchema EVENT_SCHEMA = buildSchemaV1(false);
+
+    public static final RecordSchema NO_VALUE_SCHEMA = new 
RecordSchema(Collections.singletonList(NO_VALUE));
+    public static final RecordSchema EXPLICIT_STRING_SCHEMA = new 
RecordSchema(Collections.singletonList(EXPLICIT_STRING));
+    public static final RecordSchema UNCHANGED_VALUE_SCHEMA = new 
RecordSchema(Collections.singletonList(UNCHANGED_VALUE));
+    public static final RecordSchema LOOKUP_VALUE_SCHEMA = new 
RecordSchema(Collections.singletonList(LOOKUP_VALUE));
+
+    public static final RecordSchema CONTENT_CLAIM_SCHEMA = new 
RecordSchema(Collections.singletonList(CURRENT_CONTENT_CLAIM));
+
+    private static RecordSchema buildSchemaV1(final boolean includeEventId) {
+        final List<RecordField> fields = new ArrayList<>();
+        if (includeEventId) {
+            fields.add(RECORD_IDENTIFIER_OFFSET);
+        }
+
+        fields.add(EVENT_TYPE_ORDINAL);
+        fields.add(EVENT_TIME_OFFSET);
+        fields.add(FLOWFILE_ENTRY_DATE_OFFSET);
+        fields.add(EVENT_DURATION);
+        fields.add(LINEAGE_START_DATE_OFFSET);
+        fields.add(COMPONENT_ID);
+        fields.add(COMPONENT_TYPE);
+        fields.add(EVENT_DETAILS);
+        fields.add(PREVIOUS_ATTRIBUTES);
+        fields.add(UPDATED_ATTRIBUTES);
+        fields.add(CURRENT_CONTENT_CLAIM);
+        fields.add(PREVIOUS_CONTENT_CLAIM);
+        fields.add(SOURCE_QUEUE_ID);
+
+        // EventType-Specific fields
+        fields.add(PARENT_UUIDS);  // for FORK, JOIN, CLONE, REPLAY events
+        fields.add(CHILD_UUIDS); // for FORK, JOIN, CLONE, REPLAY events
+        fields.add(TRANSIT_URI); // for SEND/RECEIVE/FETCH events
+        fields.add(SOURCE_SYSTEM_FLOWFILE_IDENTIFIER); // for SEND/RECEIVE 
events
+        fields.add(ALTERNATE_IDENTIFIER); // for ADD_INFO events
+        fields.add(RELATIONSHIP); // for ROUTE events
+
+        final RecordSchema schema = new RecordSchema(fields);
+        return schema;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
index d70bd39..4655613 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
@@ -46,11 +46,15 @@ import org.apache.nifi.repository.schema.RecordField;
 import org.apache.nifi.repository.schema.RecordSchema;
 
 public class ProvenanceEventSchema {
-    public static final RecordSchema PROVENANCE_EVENT_SCHEMA_V1 = 
buildSchemaV1();
+    public static final RecordSchema PROVENANCE_EVENT_SCHEMA_V1 = 
buildSchemaV1(true);
+    public static final RecordSchema 
PROVENANCE_EVENT_SCHEMA_V1_WITHOUT_EVENT_ID = buildSchemaV1(false);
 
-    private static RecordSchema buildSchemaV1() {
+    private static RecordSchema buildSchemaV1(final boolean includeEventId) {
         final List<RecordField> fields = new ArrayList<>();
-        fields.add(RECORD_IDENTIFIER);
+        if (includeEventId) {
+            fields.add(RECORD_IDENTIFIER);
+        }
+
         fields.add(EVENT_TYPE);
         fields.add(EVENT_TIME);
         fields.add(FLOWFILE_ENTRY_DATE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
index 056829a..1a6c3c5 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
@@ -17,18 +17,18 @@
 
 package org.apache.nifi.provenance.serialization;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Optional;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.StandardRecordReader;
 import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -36,7 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class CompressableRecordReader implements RecordReader {
-    private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordReader.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(CompressableRecordReader.class);
 
     private final ByteCountingInputStream rawInputStream;
     private final String filename;
@@ -48,6 +48,7 @@ public abstract class CompressableRecordReader implements 
RecordReader {
 
     private DataInputStream dis;
     private ByteCountingInputStream byteCountingIn;
+    private StandardProvenanceEventRecord pushbackEvent = null;
 
     public CompressableRecordReader(final InputStream in, final String 
filename, final int maxAttributeChars) throws IOException {
         this(in, filename, null, maxAttributeChars);
@@ -120,6 +121,8 @@ public abstract class CompressableRecordReader implements 
RecordReader {
             try {
                 StreamUtils.skip(rawInputStream, bytesToSkip);
                 logger.debug("Skipped stream from offset {} to {} ({} bytes 
skipped)", curOffset, offset, bytesToSkip);
+            } catch (final EOFException eof) {
+                throw new EOFException("Attempted to skip to byte offset " + 
offset + " for " + filename + " but file does not have that many bytes (TOC 
Reader=" + getTocReader() + ")");
             } catch (final IOException e) {
                 throw new IOException("Failed to skip to offset " + offset + " 
for block " + blockIndex + " of Provenance Log " + filename, e);
             }
@@ -177,24 +180,29 @@ public abstract class CompressableRecordReader implements 
RecordReader {
         return byteCountingIn.getBytesConsumed();
     }
 
-    private boolean isData() throws IOException {
-        byteCountingIn.mark(1);
-        int nextByte = byteCountingIn.read();
-        byteCountingIn.reset();
+    @Override
+    public boolean isData() {
+        try {
+            byteCountingIn.mark(1);
+            int nextByte = byteCountingIn.read();
+            byteCountingIn.reset();
+
+            if (nextByte < 0) {
+                try {
+                    resetStreamForNextBlock();
+                } catch (final EOFException eof) {
+                    return false;
+                }
 
-        if (nextByte < 0) {
-            try {
-                resetStreamForNextBlock();
-            } catch (final EOFException eof) {
-                return false;
+                byteCountingIn.mark(1);
+                nextByte = byteCountingIn.read();
+                byteCountingIn.reset();
             }
 
-            byteCountingIn.mark(1);
-            nextByte = byteCountingIn.read();
-            byteCountingIn.reset();
+            return nextByte >= 0;
+        } catch (final IOException ioe) {
+            return false;
         }
-
-        return nextByte >= 0;
     }
 
     @Override
@@ -268,6 +276,12 @@ public abstract class CompressableRecordReader implements 
RecordReader {
 
     @Override
     public StandardProvenanceEventRecord nextRecord() throws IOException {
+        if (pushbackEvent != null) {
+            final StandardProvenanceEventRecord toReturn = pushbackEvent;
+            pushbackEvent = null;
+            return toReturn;
+        }
+
         if (isData()) {
             return nextRecord(dis, serializationVersion);
         } else {
@@ -275,6 +289,65 @@ public abstract class CompressableRecordReader implements 
RecordReader {
         }
     }
 
+    protected Optional<Integer> getBlockIndex(final long eventId) {
+        final TocReader tocReader = getTocReader();
+        if (tocReader == null) {
+            return Optional.empty();
+        } else {
+            final Integer blockIndex = 
tocReader.getBlockIndexForEventId(eventId);
+            return Optional.ofNullable(blockIndex);
+        }
+    }
+
+    @Override
+    public Optional<ProvenanceEventRecord> skipToEvent(final long eventId) 
throws IOException {
+        if (pushbackEvent != null) {
+            final StandardProvenanceEventRecord previousPushBack = 
pushbackEvent;
+            if (previousPushBack.getEventId() >= eventId) {
+                return Optional.of(previousPushBack);
+            } else {
+                pushbackEvent = null;
+            }
+        }
+
+        final Optional<Integer> blockIndex = getBlockIndex(eventId);
+        if (blockIndex.isPresent()) {
+            // Skip to the appropriate block index and then read until we've 
found an Event
+            // that has an ID >= the event id.
+            skipToBlock(blockIndex.get());
+        }
+
+        try {
+            boolean read = true;
+            while (read) {
+                final Optional<StandardProvenanceEventRecord> eventOptional = 
readToEvent(eventId, dis, serializationVersion);
+                if (eventOptional.isPresent()) {
+                    pushbackEvent = eventOptional.get();
+                    return Optional.of(pushbackEvent);
+                } else {
+                    read = isData();
+                }
+            }
+
+            return Optional.empty();
+        } catch (final EOFException eof) {
+            // This can occur if we run out of data and attempt to read the 
next event ID.
+            logger.error("Unexpectedly reached end of File when looking for 
Provenance Event with ID {} in {}", eventId, filename);
+            return Optional.empty();
+        }
+    }
+
+    protected Optional<StandardProvenanceEventRecord> readToEvent(final long 
eventId, final DataInputStream dis, final int serializationVerison) throws 
IOException {
+        StandardProvenanceEventRecord event;
+        while ((event = nextRecord()) != null) {
+            if (event.getEventId() >= eventId) {
+                return Optional.of(event);
+            }
+        }
+
+        return Optional.empty();
+    }
+
     protected abstract StandardProvenanceEventRecord 
nextRecord(DataInputStream in, int serializationVersion) throws IOException;
 
     protected void readHeader(DataInputStream in, int serializationVersion) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
index fa0e390..b564600 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
@@ -17,17 +17,18 @@
 
 package org.apache.nifi.provenance.serialization;
 
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.provenance.AbstractRecordWriter;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.stream.io.DataOutputStream;
 import org.apache.nifi.stream.io.GZIPOutputStream;
 import org.apache.nifi.stream.io.NonCloseableOutputStream;
 import org.slf4j.Logger;
@@ -40,14 +41,16 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
     private final ByteCountingOutputStream rawOutStream;
     private final boolean compressed;
     private final int uncompressedBlockSize;
+    private final AtomicLong idGenerator;
 
     private DataOutputStream out;
     private ByteCountingOutputStream byteCountingOut;
-    private long lastBlockOffset = 0L;
+    private long blockStartOffset = 0L;
     private int recordCount = 0;
 
 
-    public CompressableRecordWriter(final File file, final TocWriter writer, 
final boolean compressed, final int uncompressedBlockSize) throws IOException {
+    public CompressableRecordWriter(final File file, final AtomicLong 
idGenerator, final TocWriter writer, final boolean compressed,
+        final int uncompressedBlockSize) throws IOException {
         super(file, writer);
         logger.trace("Creating Record Writer for {}", file.getName());
 
@@ -55,18 +58,25 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
         this.fos = new FileOutputStream(file);
         rawOutStream = new ByteCountingOutputStream(fos);
         this.uncompressedBlockSize = uncompressedBlockSize;
+        this.idGenerator = idGenerator;
     }
 
-    public CompressableRecordWriter(final OutputStream out, final TocWriter 
tocWriter, final boolean compressed, final int uncompressedBlockSize) throws 
IOException {
-        super(null, tocWriter);
+    public CompressableRecordWriter(final OutputStream out, final String 
storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter, final 
boolean compressed,
+        final int uncompressedBlockSize) throws IOException {
+        super(storageLocation, tocWriter);
         this.fos = null;
 
         this.compressed = compressed;
         this.uncompressedBlockSize = uncompressedBlockSize;
         this.rawOutStream = new ByteCountingOutputStream(out);
+        this.idGenerator = idGenerator;
     }
 
 
+    protected AtomicLong getIdGenerator() {
+        return idGenerator;
+    }
+
     @Override
     public synchronized void writeHeader(final long firstEventId) throws 
IOException {
         if (isDirty()) {
@@ -74,13 +84,13 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
         }
 
         try {
-            lastBlockOffset = rawOutStream.getBytesWritten();
+            blockStartOffset = rawOutStream.getBytesWritten();
             resetWriteStream(firstEventId);
             out.writeUTF(getSerializationName());
             out.writeInt(getSerializationVersion());
             writeHeader(firstEventId, out);
             out.flush();
-            lastBlockOffset = rawOutStream.getBytesWritten();
+            blockStartOffset = getBytesWritten();
         } catch (final IOException ioe) {
             markDirty();
             throw ioe;
@@ -95,7 +105,7 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
      * @param eventId the first id that will be written to the new block
      * @throws IOException if unable to flush/close the current streams 
properly
      */
-    private void resetWriteStream(final long eventId) throws IOException {
+    protected void resetWriteStream(final Long eventId) throws IOException {
         try {
             if (out != null) {
                 out.flush();
@@ -114,13 +124,13 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
                     out.close();
                 }
 
-                if (tocWriter != null) {
+                if (tocWriter != null && eventId != null) {
                     tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), 
eventId);
                 }
 
                 writableStream = new BufferedOutputStream(new 
GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
             } else {
-                if (tocWriter != null) {
+                if (tocWriter != null && eventId != null) {
                     tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), 
eventId);
                 }
 
@@ -136,33 +146,34 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
         }
     }
 
-
+    protected synchronized void ensureStreamState(final long recordIdentifier, 
final long startBytes) throws IOException {
+        // add a new block to the TOC if needed.
+        if (getTocWriter() != null && (startBytes - blockStartOffset >= 
uncompressedBlockSize)) {
+            blockStartOffset = startBytes;
+            resetWriteStream(recordIdentifier);
+        }
+    }
 
     @Override
-    public long writeRecord(final ProvenanceEventRecord record, final long 
recordIdentifier) throws IOException {
+    public synchronized StorageSummary writeRecord(final ProvenanceEventRecord 
record) throws IOException {
         if (isDirty()) {
             throw new IOException("Cannot update Provenance Repository because 
this Record Writer has already failed to write to the Repository");
         }
 
         try {
+            final long recordIdentifier = record.getEventId() == -1L ? 
idGenerator.getAndIncrement() : record.getEventId();
             final long startBytes = byteCountingOut.getBytesWritten();
 
-            // add a new block to the TOC if needed.
-            if (getTocWriter() != null && (startBytes - lastBlockOffset >= 
uncompressedBlockSize)) {
-                lastBlockOffset = startBytes;
-
-                if (compressed) {
-                    // because of the way that GZIPOutputStream works, we need 
to call close() on it in order for it
-                    // to write its trailing bytes. But we don't want to close 
the underlying OutputStream, so we wrap
-                    // the underlying OutputStream in a 
NonCloseableOutputStream
-                    resetWriteStream(recordIdentifier);
-                }
-            }
-
+            ensureStreamState(recordIdentifier, startBytes);
             writeRecord(record, recordIdentifier, out);
 
             recordCount++;
-            return byteCountingOut.getBytesWritten() - startBytes;
+            final long bytesWritten = byteCountingOut.getBytesWritten();
+            final long serializedLength = bytesWritten - startBytes;
+            final TocWriter tocWriter = getTocWriter();
+            final Integer blockIndex = tocWriter == null ? null : 
tocWriter.getCurrentBlockIndex();
+            final String storageLocation = getStorageLocation();
+            return new StorageSummary(recordIdentifier, storageLocation, 
blockIndex, serializedLength, bytesWritten);
         } catch (final IOException ioe) {
             markDirty();
             throw ioe;
@@ -170,7 +181,12 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
     }
 
     @Override
-    public void flush() throws IOException {
+    public synchronized long getBytesWritten() {
+        return byteCountingOut == null ? 0L : 
byteCountingOut.getBytesWritten();
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
         out.flush();
     }
 
@@ -180,22 +196,26 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
     }
 
     @Override
-    protected OutputStream getBufferedOutputStream() {
+    protected synchronized DataOutputStream getBufferedOutputStream() {
         return out;
     }
 
     @Override
-    protected OutputStream getUnderlyingOutputStream() {
+    protected synchronized OutputStream getUnderlyingOutputStream() {
         return fos;
     }
 
     @Override
-    protected void syncUnderlyingOutputStream() throws IOException {
+    protected synchronized void syncUnderlyingOutputStream() throws 
IOException {
         if (fos != null) {
             fos.getFD().sync();
         }
     }
 
+    protected boolean isCompressed() {
+        return compressed;
+    }
+
     protected abstract void writeRecord(final ProvenanceEventRecord event, 
final long eventId, final DataOutputStream out) throws IOException;
 
     protected abstract void writeHeader(final long firstEventId, final 
DataOutputStream out) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
index 38a4cc9..d4487ab 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
@@ -18,7 +18,9 @@
 package org.apache.nifi.provenance.serialization;
 
 import java.io.IOException;
+import java.util.Optional;
 
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.toc.TocReader;
 
@@ -69,4 +71,14 @@ public class EmptyRecordReader implements RecordReader {
     public long getMaxEventId() throws IOException {
         return 0;
     }
+
+    @Override
+    public Optional<ProvenanceEventRecord> skipToEvent(long eventId) throws 
IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean isData() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EventFileCompressor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EventFileCompressor.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EventFileCompressor.java
new file mode 100644
index 0000000..4814c95
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EventFileCompressor.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.store.EventFileManager;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.provenance.util.CloseableUtil;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This class is responsible for compressing Event Files as a background task. 
This is done as a background task instead of being
+ * done inline because if compression is performed inline, whenever NiFi is 
restarted (especially if done so abruptly), it is very
+ * possible that the GZIP stream will be corrupt. As a result, we would stand 
to lose some Provenance Events when NiFi is restarted.
+ * In order to avoid that, we write data in an uncompressed format and then 
compress the data in the background. Once the data has
+ * been compressed, this task will then remove the original, uncompressed 
file. If the file is being read by another thread, this
+ * task will wait for the other thread to finish reading the data before 
deleting the file. This synchronization of the File is handled
+ * via the {@link EventFileManager Event File Manager}.
+ * </p>
+ */
+public class EventFileCompressor implements Runnable {
+    private static final Logger logger = 
LoggerFactory.getLogger(EventFileCompressor.class);
+    private final BlockingQueue<File> filesToCompress;
+    private final EventFileManager eventFileManager;
+    private volatile boolean shutdown = false;
+
+    public EventFileCompressor(final BlockingQueue<File> filesToCompress, 
final EventFileManager eventFileManager) {
+        this.filesToCompress = filesToCompress;
+        this.eventFileManager = eventFileManager;
+    }
+
+    public void shutdown() {
+        shutdown = true;
+    }
+
+    @Override
+    public void run() {
+        while (!shutdown) {
+            File uncompressedEventFile = null;
+
+            try {
+                final long start = System.nanoTime();
+                uncompressedEventFile = filesToCompress.poll(1, 
TimeUnit.SECONDS);
+                if (uncompressedEventFile == null || shutdown) {
+                    continue;
+                }
+
+                File outputFile = null;
+                long bytesBefore = 0L;
+                StandardTocReader tocReader = null;
+
+                File tmpTocFile = null;
+                eventFileManager.obtainReadLock(uncompressedEventFile);
+                try {
+                    StandardTocWriter tocWriter = null;
+
+                    final File tocFile = 
TocUtil.getTocFile(uncompressedEventFile);
+                    try {
+                        tocReader = new StandardTocReader(tocFile);
+                    } catch (final IOException e) {
+                        logger.error("Failed to read TOC File {}", tocFile, e);
+                        continue;
+                    }
+
+                    bytesBefore = uncompressedEventFile.length();
+
+                    try {
+                        outputFile = new 
File(uncompressedEventFile.getParentFile(), uncompressedEventFile.getName() + 
".gz");
+                        try {
+                            tmpTocFile = new File(tocFile.getParentFile(), 
tocFile.getName() + ".tmp");
+                            tocWriter = new StandardTocWriter(tmpTocFile, 
true, false);
+                            compress(uncompressedEventFile, tocReader, 
outputFile, tocWriter);
+                            tocWriter.close();
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to compress {} on rollover", 
uncompressedEventFile, ioe);
+                        }
+                    } finally {
+                        CloseableUtil.closeQuietly(tocReader, tocWriter);
+                    }
+                } finally {
+                    eventFileManager.releaseReadLock(uncompressedEventFile);
+                }
+
+                eventFileManager.obtainWriteLock(uncompressedEventFile);
+                try {
+                    // Attempt to delete the input file and associated toc file
+                    if (uncompressedEventFile.delete()) {
+                        if (tocReader != null) {
+                            final File tocFile = tocReader.getFile();
+                            if (!tocFile.delete()) {
+                                logger.warn("Failed to delete {}; this file 
should be cleaned up manually", tocFile);
+                            }
+
+                            if (tmpTocFile != null) {
+                                tmpTocFile.renameTo(tocFile);
+                            }
+                        }
+                    } else {
+                        logger.warn("Failed to delete {}; this file should be 
cleaned up manually", uncompressedEventFile);
+                    }
+                } finally {
+                    eventFileManager.releaseWriteLock(uncompressedEventFile);
+                }
+
+                final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                final long bytesAfter = outputFile.length();
+                final double reduction = 100 * (1 - (double) bytesAfter / 
(double) bytesBefore);
+                final String reductionTwoDecimals = String.format("%.2f", 
reduction);
+                logger.debug("Successfully compressed Provenance Event File {} 
in {} millis from {} to {}, a reduction of {}%",
+                    uncompressedEventFile, millis, 
FormatUtils.formatDataSize(bytesBefore), 
FormatUtils.formatDataSize(bytesAfter), reductionTwoDecimals);
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
+            } catch (final Exception e) {
+                logger.error("Failed to compress {}", uncompressedEventFile, 
e);
+            }
+        }
+    }
+
+    public static void compress(final File input, final TocReader tocReader, 
final File output, final TocWriter tocWriter) throws IOException {
+        try (final InputStream fis = new FileInputStream(input);
+            final OutputStream fos = new FileOutputStream(output);
+            final ByteCountingOutputStream byteCountingOut = new 
ByteCountingOutputStream(fos)) {
+
+            int blockIndex = 0;
+            while (true) {
+                // Determine the min and max byte ranges for the current block.
+                final long blockStart = tocReader.getBlockOffset(blockIndex);
+                if (blockStart == -1) {
+                    break;
+                }
+
+                long blockEnd = tocReader.getBlockOffset(blockIndex + 1);
+                if (blockEnd < 0) {
+                    blockEnd = input.length();
+                }
+
+                final long firstEventId = 
tocReader.getFirstEventIdForBlock(blockIndex);
+                final long blockStartOffset = 
byteCountingOut.getBytesWritten();
+
+                try (final OutputStream ncos = new 
NonCloseableOutputStream(byteCountingOut);
+                    final OutputStream gzipOut = new GZIPOutputStream(ncos, 
1)) {
+                    StreamUtils.copy(fis, gzipOut, blockEnd - blockStart);
+                }
+
+                tocWriter.addBlockOffset(blockStartOffset, firstEventId);
+                blockIndex++;
+            }
+        }
+
+        // Close the TOC Reader and TOC Writer
+        CloseableUtil.closeQuietly(tocReader, tocWriter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 91c8222..9377f2c 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -18,7 +18,9 @@ package org.apache.nifi.provenance.serialization;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Optional;
 
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.toc.TocReader;
 
@@ -50,7 +52,7 @@ public interface RecordReader extends Closeable {
     /**
      * Skips to the specified compression block
      *
-     * @param blockIndex the byte index to skip to
+     * @param blockIndex the block index to skip to
      * @throws IOException if the underlying stream throws IOException, or if 
the reader has already
      * read passed the specified compression block index
      * @throws IllegalStateException if the RecordReader does not have a 
TableOfContents associated with it
@@ -58,6 +60,18 @@ public interface RecordReader extends Closeable {
     void skipToBlock(int blockIndex) throws IOException;
 
     /**
+     * Skips to the first event in the stream with an Event ID >= the given 
ID. If no event is found with an
+     * ID >= the given ID an empty Optional is returned. Otherwise, an 
Optional containing the first event in the stream with an
+     * ID >= the given ID is returned. Unlike {@link #nextRecord()}, this 
method does not consume the returned event from the stream.
+     * I.e., if a record is returned, that same record will be returned again 
the next time that {@link #nextRecord()} is called.
+     *
+     * @param eventId the ID of the event to retrieve
+     * @return the first event in the stream with an Event ID >= the given ID 
or an empty Optional if no such event can be found
+     * @throws IOException if the underlying stream throws IOException
+     */
+    Optional<ProvenanceEventRecord> skipToEvent(long eventId) throws 
IOException;
+
+    /**
      * Returns the block index that the Reader is currently reading from.
      * Note that the block index is incremented at the beginning of the {@link 
#nextRecord()}
      * method. This means that this method will return the block from which 
the previous record was read,
@@ -100,4 +114,11 @@ public interface RecordReader extends Closeable {
      * @throws IOException if unable to get id of the last event
      */
     long getMaxEventId() throws IOException;
+
+    /**
+     * Returns <code>true</code> if there is more data for hte Record Reader 
to read, <code>false</code> otherwise.
+     *
+     * @return <code>true</code> if there is more data for hte Record Reader 
to read, <code>false</code> otherwise.
+     */
+    boolean isData();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 526a488..8e79ddd 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -30,8 +30,9 @@ import java.util.zip.GZIPInputStream;
 
 import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
 import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
+import org.apache.nifi.provenance.EventIdFirstSchemaRecordReader;
+import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
 import org.apache.nifi.provenance.StandardRecordReader;
-import org.apache.nifi.provenance.StandardRecordWriter;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
 import org.apache.nifi.provenance.toc.StandardTocReader;
 import org.apache.nifi.provenance.toc.TocReader;
@@ -78,10 +79,10 @@ public class RecordReaders {
             String filename = file.getName();
             openStream: while ( fis == null ) {
                 final File dir = file.getParentFile();
-                final String baseName = 
LuceneUtil.substringBefore(file.getName(), ".");
+                final String baseName = 
LuceneUtil.substringBefore(file.getName(), ".prov");
 
-                // depending on which rollover actions have occurred, we could 
have 3 possibilities for the
-                // filename that we need. The majority of the time, we will 
use the extension ".prov.indexed.gz"
+                // depending on which rollover actions have occurred, we could 
have 2 possibilities for the
+                // filename that we need. The majority of the time, we will 
use the extension ".prov.gz"
                 // because most often we are compressing on rollover and most 
often we have already finished
                 // compressing by the time that we are querying the data.
                 for ( final String extension : new String[] {".prov.gz", 
".prov"} ) {
@@ -123,7 +124,7 @@ public class RecordReaders {
             }
 
             switch (serializationName) {
-                case StandardRecordWriter.SERIALIZATION_NAME: {
+                case StandardRecordReader.SERIALIZATION_NAME: {
                     if (tocFile.exists()) {
                         final TocReader tocReader = new 
StandardTocReader(tocFile);
                         return new StandardRecordReader(bufferedInStream, 
filename, tocReader, maxAttributeChars);
@@ -139,6 +140,14 @@ public class RecordReaders {
                         return new 
ByteArraySchemaRecordReader(bufferedInStream, filename, maxAttributeChars);
                     }
                 }
+                case EventIdFirstSchemaRecordWriter.SERIALIZATION_NAME: {
+                    if (!tocFile.exists()) {
+                        throw new FileNotFoundException("Cannot create TOC 
Reader because the file " + tocFile + " does not exist");
+                    }
+
+                    final TocReader tocReader = new StandardTocReader(tocFile);
+                    return new 
EventIdFirstSchemaRecordReader(bufferedInStream, filename, tocReader, 
maxAttributeChars);
+                }
                 default: {
                     throw new IOException("Unable to read data from file " + 
file + " because the file was written using an unknown Serializer: " + 
serializationName);
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index 17dd75c..c9d2a22 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -37,11 +37,10 @@ public interface RecordWriter extends Closeable {
      * Writes the given record out to the underlying stream
      *
      * @param record the record to write
-     * @param recordIdentifier the new identifier of the record
      * @return the number of bytes written for the given records
      * @throws IOException if unable to write the record to the stream
      */
-    long writeRecord(ProvenanceEventRecord record, long recordIdentifier) 
throws IOException;
+    StorageSummary writeRecord(ProvenanceEventRecord record) throws 
IOException;
 
     /**
      * Flushes any data that is held in a buffer to the underlying storage 
mechanism
@@ -56,6 +55,11 @@ public interface RecordWriter extends Closeable {
     int getRecordsWritten();
 
     /**
+     * @return the number of bytes written to this writer
+     */
+    long getBytesWritten();
+
+    /**
      * @return the file that this RecordWriter is writing to
      */
     File getFile();
@@ -89,6 +93,11 @@ public interface RecordWriter extends Closeable {
     void markDirty();
 
     /**
+     * @return <code>true</code> if {@link #markDirty()} has been called, 
<code>false</code> otherwise
+     */
+    boolean isDirty();
+
+    /**
      * Syncs the content written to this writer to disk.
      *
      * @throws IOException if unable to sync content to disk

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index be4c9cf..cacaebd 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance.serialization;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
 import org.apache.nifi.provenance.toc.StandardTocWriter;
@@ -27,13 +28,14 @@ import org.apache.nifi.provenance.toc.TocWriter;
 public class RecordWriters {
     private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 
1 MB
 
-    public static RecordWriter newSchemaRecordWriter(final File file, final 
boolean compressed, final boolean createToc) throws IOException {
-        return newSchemaRecordWriter(file, compressed, createToc, 
DEFAULT_COMPRESSION_BLOCK_SIZE);
+    public static RecordWriter newSchemaRecordWriter(final File file, final 
AtomicLong idGenerator, final boolean compressed, final boolean createToc) 
throws IOException {
+        return newSchemaRecordWriter(file, idGenerator, compressed, createToc, 
DEFAULT_COMPRESSION_BLOCK_SIZE);
     }
 
-    public static RecordWriter newSchemaRecordWriter(final File file, final 
boolean compressed, final boolean createToc, final int compressionBlockBytes) 
throws IOException {
+    public static RecordWriter newSchemaRecordWriter(final File file, final 
AtomicLong idGenerator, final boolean compressed, final boolean createToc,
+        final int compressionBlockBytes) throws IOException {
         final TocWriter tocWriter = createToc ? new 
StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
-        return new ByteArraySchemaRecordWriter(file, tocWriter, compressed, 
compressionBlockBytes);
+        return new ByteArraySchemaRecordWriter(file, idGenerator, tocWriter, 
compressed, compressionBlockBytes);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/StorageSummary.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/StorageSummary.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/StorageSummary.java
new file mode 100644
index 0000000..dffcd6d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/StorageSummary.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.util.Optional;
+
+public class StorageSummary {
+    private final long eventId;
+    private final String storageLocation;
+    private final String partitionName;
+    private final Integer blockIndex;
+    private final long serializedLength;
+    private final long bytesWritten;
+
+    public StorageSummary(final long eventId, final String storageLocation, 
final Integer blockIndex, final long serializedLength, final long bytesWritten) 
{
+        this(eventId, storageLocation, null, blockIndex, serializedLength, 
bytesWritten);
+    }
+
+    public StorageSummary(final long eventId, final String storageLocation, 
final String partitionName,
+        final Integer blockIndex, final long serializedLength, final long 
bytesWritten) {
+        this.eventId = eventId;
+        this.storageLocation = storageLocation;
+        this.partitionName = partitionName;
+        this.blockIndex = blockIndex;
+        this.serializedLength = serializedLength;
+        this.bytesWritten = bytesWritten;
+    }
+
+    public long getEventId() {
+        return eventId;
+    }
+
+    public String getStorageLocation() {
+        return storageLocation;
+    }
+
+    public Optional<String> getPartitionName() {
+        return Optional.ofNullable(partitionName);
+    }
+
+    public Integer getBlockIndex() {
+        return blockIndex;
+    }
+
+    public long getSerializedLength() {
+        return serializedLength;
+    }
+
+    public long getBytesWritten() {
+        return bytesWritten;
+    }
+
+    @Override
+    public String toString() {
+        return "StorageSummary[eventId=" + getEventId() + ", partition=" + 
getPartitionName().orElse(null) + ", location=" + getStorageLocation() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventFileManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventFileManager.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventFileManager.java
new file mode 100644
index 0000000..3754113
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/EventFileManager.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.store;
+
+import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+
+import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.util.Tuple;
+
+/**
+ * The EventFileManager is responsible for maintaining locks on Event Files so 
that we can ensure that no thread deletes
+ * an Event File while it is still being read. Without this manager, this 
could happen, for instance, if the Compression Thread
+ * were to compress an Event File, and then delete the original/uncompressed 
version while a Provenance Query was reading the
+ * uncompressed version of the file.
+ */
+public class EventFileManager {
+
+    private final ConcurrentMap<String, Tuple<ReadWriteLock, Integer>> lockMap 
= new ConcurrentHashMap<>();
+
+    private String getMapKey(final File file) {
+        return LuceneUtil.substringBefore(file.getName(), ".prov");
+    }
+
+    private ReadWriteLock updateCount(final File file, final Function<Integer, 
Integer> update) {
+        final String key = getMapKey(file);
+        boolean updated = false;
+
+        Tuple<ReadWriteLock, Integer> updatedTuple = null;
+        while (!updated) {
+            final Tuple<ReadWriteLock, Integer> tuple = 
lockMap.computeIfAbsent(key, k -> new Tuple<>(new ReentrantReadWriteLock(), 0));
+            final Integer updatedCount = update.apply(tuple.getValue());
+            updatedTuple = new Tuple<>(tuple.getKey(), updatedCount);
+            updated = lockMap.replace(key, tuple, updatedTuple);
+        }
+
+        return updatedTuple.getKey();
+    }
+
+    private ReadWriteLock incrementCount(final File file) {
+        return updateCount(file, val -> val + 1);
+    }
+
+    private ReadWriteLock decrementCount(final File file) {
+        return updateCount(file, val -> val - 1);
+    }
+
+
+    public void obtainReadLock(final File file) {
+        final ReadWriteLock rwLock = incrementCount(file);
+        rwLock.readLock().lock();
+    }
+
+    public void releaseReadLock(final File file) {
+        final ReadWriteLock rwLock = decrementCount(file);
+        rwLock.readLock().unlock();
+    }
+
+    public void obtainWriteLock(final File file) {
+        final ReadWriteLock rwLock = incrementCount(file);
+        rwLock.writeLock().lock();
+    }
+
+    public void releaseWriteLock(final File file) {
+        final String key = getMapKey(file);
+
+        boolean updated = false;
+        while (!updated) {
+            final Tuple<ReadWriteLock, Integer> tuple = lockMap.get(key);
+            if (tuple == null) {
+                throw new IllegalMonitorStateException("Lock is not owned");
+            }
+
+            // If this is the only reference to the lock, remove it from the 
map and then unlock.
+            if (tuple.getValue() <= 1) {
+                updated = lockMap.remove(key, tuple);
+                if (updated) {
+                    tuple.getKey().writeLock().unlock();
+                }
+            } else {
+                final Tuple<ReadWriteLock, Integer> updatedTuple = new 
Tuple<>(tuple.getKey(), tuple.getValue() - 1);
+                updated = lockMap.replace(key, tuple, updatedTuple);
+                if (updated) {
+                    tuple.getKey().writeLock().unlock();
+                }
+            }
+        }
+    }
+
+}

Reply via email to