http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
----------------------------------------------------------------------
diff --git 
a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
new file mode 100644
index 0000000..afb56e8
--- /dev/null
+++ 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.EdgeNode;
+import org.apache.nifi.provenance.lineage.EventNode;
+import org.apache.nifi.provenance.lineage.FlowFileNode;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+
+/**
+ *
+ */
+public class StandardLineageResult implements ComputeLineageResult {
+
+    public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, 
TimeUnit.MINUTES);
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardLineageResult.class);
+
+    private final Collection<String> flowFileUuids;
+    private final Collection<ProvenanceEventRecord> relevantRecords = new 
ArrayList<>();
+    private final Set<LineageNode> nodes = new HashSet<>();
+    private final Set<LineageEdge> edges = new HashSet<>();
+    private final int numSteps;
+    private final long creationNanos;
+    private long computationNanos;
+
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+
+    private Date expirationDate = null;
+    private String error = null;
+    private int numCompletedSteps = 0;
+
+    private volatile boolean canceled = false;
+
+    public StandardLineageResult(final int numSteps, final Collection<String> 
flowFileUuids) {
+        this.numSteps = numSteps;
+        this.creationNanos = System.nanoTime();
+        this.flowFileUuids = flowFileUuids;
+
+        updateExpiration();
+    }
+
+    @Override
+    public List<LineageNode> getNodes() {
+        readLock.lock();
+        try {
+            return new ArrayList<>(nodes);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public List<LineageEdge> getEdges() {
+        readLock.lock();
+        try {
+            return new ArrayList<>(edges);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public int getNumberOfEdges() {
+        readLock.lock();
+        try {
+            return edges.size();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public int getNumberOfNodes() {
+        readLock.lock();
+        try {
+            return nodes.size();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public long getComputationTime(final TimeUnit timeUnit) {
+        readLock.lock();
+        try {
+            return timeUnit.convert(computationNanos, TimeUnit.NANOSECONDS);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public Date getExpiration() {
+        readLock.lock();
+        try {
+            return expirationDate;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public String getError() {
+        readLock.lock();
+        try {
+            return error;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public int getPercentComplete() {
+        readLock.lock();
+        try {
+            return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / 
(float) numSteps) * 100.0F);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isFinished() {
+        readLock.lock();
+        try {
+            return numCompletedSteps >= numSteps || canceled;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public void setError(final String error) {
+        writeLock.lock();
+        try {
+            this.error = error;
+            numCompletedSteps++;
+
+            updateExpiration();
+
+            if (numCompletedSteps >= numSteps) {
+                computationNanos = System.nanoTime() - creationNanos;
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public void update(final Collection<ProvenanceEventRecord> records) {
+        writeLock.lock();
+        try {
+            relevantRecords.addAll(records);
+
+            numCompletedSteps++;
+            updateExpiration();
+
+            if (numCompletedSteps >= numSteps && error == null) {
+                computeLineage();
+                computationNanos = System.nanoTime() - creationNanos;
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * Computes the lineage from the relevant Provenance Event Records. This
+     * method must be called with the write lock held and is only going to be
+     * useful after all of the records have been successfully obtained
+     */
+    private void computeLineage() {
+        final long startNanos = System.nanoTime();
+
+        nodes.clear();
+        edges.clear();
+
+        Map<String, LineageNode> lastEventMap = new HashMap<>();    // maps 
FlowFile UUID to last event for that FlowFile
+        final List<ProvenanceEventRecord> sortedRecords = new 
ArrayList<>(relevantRecords);
+        Collections.sort(sortedRecords, new 
Comparator<ProvenanceEventRecord>() {
+            @Override
+            public int compare(final ProvenanceEventRecord o1, final 
ProvenanceEventRecord o2) {
+                // Sort on Event Time, then Event ID.
+                final int eventTimeComparison = 
Long.compare(o1.getEventTime(), o2.getEventTime());
+                if (eventTimeComparison == 0) {
+                    return Long.compare(o1.getEventId(), o2.getEventId());
+                } else {
+                    return eventTimeComparison;
+                }
+            }
+        });
+
+        // convert the StandardProvenanceRecord objects into Lineage nodes 
(FlowFileNode, EventNodes).
+        for (final ProvenanceEventRecord record : sortedRecords) {
+            final LineageNode lineageNode = new EventNode(record);
+            final boolean added = nodes.add(lineageNode);
+            if (!added) {
+                logger.debug("Did not add {} because it already exists in the 
'nodes' set", lineageNode);
+            }
+
+            // Create an edge that connects this node to the previous node for 
the same FlowFile UUID.
+            final LineageNode lastNode = 
lastEventMap.get(record.getFlowFileUuid());
+            if (lastNode != null) {
+                // We calculate the Edge UUID based on whether or not this 
event is a SPAWN.
+                // If this event is a SPAWN, then we want to use the previous 
node's UUID because a
+                // SPAWN Event's UUID is not necessarily what we want, since a 
SPAWN Event's UUID pertains to
+                // only one of (potentially) many UUIDs associated with the 
event. Otherwise, we know that
+                // the UUID of this record is appropriate, so we just use it.
+                final String edgeUuid;
+
+                switch (record.getEventType()) {
+                    case JOIN:
+                    case CLONE:
+                    case REPLAY:
+                        edgeUuid = lastNode.getFlowFileUuid();
+                        break;
+                    default:
+                        edgeUuid = record.getFlowFileUuid();
+                        break;
+                }
+
+                edges.add(new EdgeNode(edgeUuid, lastNode, lineageNode));
+            }
+
+            lastEventMap.put(record.getFlowFileUuid(), lineageNode);
+
+            switch (record.getEventType()) {
+                case FORK:
+                case JOIN:
+                case REPLAY:
+                case CLONE: {
+                    // For events that create FlowFile nodes, we need to 
create the FlowFile Nodes and associated Edges, as appropriate
+                    for (final String childUuid : record.getChildUuids()) {
+                        if (flowFileUuids.contains(childUuid)) {
+                            final FlowFileNode childNode = new 
FlowFileNode(childUuid, record.getEventTime());
+                            final boolean isNewFlowFile = nodes.add(childNode);
+                            if (!isNewFlowFile) {
+                                final String msg = "Unable to generate Lineage 
Graph because multiple events were registered claiming to have generated the 
same FlowFile (UUID = " + childNode.getFlowFileUuid() + ")";
+                                logger.error(msg);
+                                setError(msg);
+                                return;
+                            }
+
+                            edges.add(new 
EdgeNode(childNode.getFlowFileUuid(), lineageNode, childNode));
+                            lastEventMap.put(childUuid, childNode);
+                        }
+                    }
+                    for (final String parentUuid : record.getParentUuids()) {
+                        LineageNode lastNodeForParent = 
lastEventMap.get(parentUuid);
+                        if (lastNodeForParent != null && 
!lastNodeForParent.equals(lineageNode)) {
+                            edges.add(new EdgeNode(parentUuid, 
lastNodeForParent, lineageNode));
+                        }
+
+                        lastEventMap.put(parentUuid, lineageNode);
+                    }
+                }
+                break;
+                case RECEIVE:
+                case CREATE: {
+                        // for a receive event, we want to create a FlowFile 
Node that represents the FlowFile received
+                    // and create an edge from the Receive Event to the 
FlowFile Node
+                    final LineageNode flowFileNode = new 
FlowFileNode(record.getFlowFileUuid(), record.getEventTime());
+                    final boolean isNewFlowFile = nodes.add(flowFileNode);
+                    if (!isNewFlowFile) {
+                        final String msg = "Found cycle in graph. This 
indicates that multiple events were registered claiming to have generated the 
same FlowFile (UUID = " + flowFileNode.getFlowFileUuid() + ")";
+                        setError(msg);
+                        logger.error(msg);
+                        return;
+                    }
+                    edges.add(new EdgeNode(record.getFlowFileUuid(), 
lineageNode, flowFileNode));
+                    lastEventMap.put(record.getFlowFileUuid(), flowFileNode);
+                }
+                break;
+                default:
+                    break;
+            }
+        }
+
+        final long nanos = System.nanoTime() - startNanos;
+        logger.debug("Finished building lineage with {} nodes and {} edges in 
{} millis", nodes.size(), edges.size(), TimeUnit.NANOSECONDS.toMillis(nanos));
+    }
+
+    void cancel() {
+        this.canceled = true;
+    }
+
+    /**
+     * Must be called with write lock!
+     */
+    private void updateExpiration() {
+        expirationDate = new Date(System.currentTimeMillis() + TTL);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
----------------------------------------------------------------------
diff --git 
a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
new file mode 100644
index 0000000..cfbae88
--- /dev/null
+++ 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
@@ -0,0 +1,752 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.Relationship;
+
+/**
+ * Holder for provenance relevant information
+ * <p/>
+ * @author none
+ */
+public final class StandardProvenanceEventRecord implements 
ProvenanceEventRecord {
+
+    private final long eventTime;
+    private final long entryDate;
+    private final ProvenanceEventType eventType;
+    private final long lineageStartDate;
+    private final Set<String> lineageIdentifiers;
+    private final String componentId;
+    private final String componentType;
+    private final String transitUri;
+    private final String sourceSystemFlowFileIdentifier;
+    private final String uuid;
+    private final List<String> parentUuids;
+    private final List<String> childrenUuids;
+    private final String alternateIdentifierUri;
+    private final String details;
+    private final String relationship;
+    private final long storageByteOffset;
+    private final String storageFilename;
+    private final long eventDuration;
+
+    private final String contentClaimSection;
+    private final String contentClaimContainer;
+    private final String contentClaimIdentifier;
+    private final Long contentClaimOffset;
+    private final long contentSize;
+
+    private final String previousClaimSection;
+    private final String previousClaimContainer;
+    private final String previousClaimIdentifier;
+    private final Long previousClaimOffset;
+    private final Long previousSize;
+
+    private final String sourceQueueIdentifier;
+
+    private final Map<String, String> previousAttributes;
+    private final Map<String, String> updatedAttributes;
+
+    private volatile long eventId;
+
+    private StandardProvenanceEventRecord(final Builder builder) {
+        this.eventTime = builder.eventTime;
+        this.entryDate = builder.entryDate;
+        this.eventType = builder.eventType;
+        this.componentId = builder.componentId;
+        this.componentType = builder.componentType;
+        this.transitUri = builder.transitUri;
+        this.sourceSystemFlowFileIdentifier = 
builder.sourceSystemFlowFileIdentifier;
+        this.uuid = builder.uuid;
+        this.parentUuids = builder.parentUuids;
+        this.childrenUuids = builder.childrenUuids;
+        this.alternateIdentifierUri = builder.alternateIdentifierUri;
+        this.details = builder.details;
+        this.relationship = builder.relationship;
+        this.storageByteOffset = builder.storageByteOffset;
+        this.storageFilename = builder.storageFilename;
+        this.eventDuration = builder.eventDuration;
+        this.lineageStartDate = builder.lineageStartDate;
+        this.lineageIdentifiers = 
Collections.unmodifiableSet(builder.lineageIdentifiers);
+
+        previousClaimSection = builder.previousClaimSection;
+        previousClaimContainer = builder.previousClaimContainer;
+        previousClaimIdentifier = builder.previousClaimIdentifier;
+        previousClaimOffset = builder.previousClaimOffset;
+        previousSize = builder.previousSize;
+
+        contentClaimSection = builder.contentClaimSection;
+        contentClaimContainer = builder.contentClaimContainer;
+        contentClaimIdentifier = builder.contentClaimIdentifier;
+        contentClaimOffset = builder.contentClaimOffset;
+        contentSize = builder.contentSize;
+
+        previousAttributes = builder.previousAttributes == null ? 
Collections.<String, String>emptyMap() : 
Collections.unmodifiableMap(builder.previousAttributes);
+        updatedAttributes = builder.updatedAttributes == null ? 
Collections.<String, String>emptyMap() : 
Collections.unmodifiableMap(builder.updatedAttributes);
+
+        sourceQueueIdentifier = builder.sourceQueueIdentifier;
+
+    }
+
+    public String getStorageFilename() {
+        return storageFilename;
+    }
+
+    public long getStorageByteOffset() {
+        return storageByteOffset;
+    }
+
+    void setEventId(final long eventId) {
+        this.eventId = eventId;
+    }
+
+    @Override
+    public long getEventId() {
+        return eventId;
+    }
+
+    @Override
+    public long getEventTime() {
+        return eventTime;
+    }
+
+    @Override
+    public Set<String> getLineageIdentifiers() {
+        return lineageIdentifiers;
+    }
+
+    @Override
+    public long getLineageStartDate() {
+        return lineageStartDate;
+    }
+
+    @Override
+    public long getFileSize() {
+        return contentSize;
+    }
+
+    @Override
+    public Long getPreviousFileSize() {
+        return previousSize;
+    }
+
+    @Override
+    public ProvenanceEventType getEventType() {
+        return eventType;
+    }
+
+    @Override
+    public Map<String, String> getAttributes() {
+        final Map<String, String> allAttrs = new 
HashMap<>(previousAttributes.size() + updatedAttributes.size());
+        allAttrs.putAll(previousAttributes);
+        for (final Map.Entry<String, String> entry : 
updatedAttributes.entrySet()) {
+            if (entry.getValue() != null) {
+                allAttrs.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return allAttrs;
+    }
+
+    @Override
+    public String getComponentId() {
+        return componentId;
+    }
+
+    @Override
+    public String getComponentType() {
+        return componentType;
+    }
+
+    @Override
+    public String getTransitUri() {
+        return transitUri;
+    }
+
+    @Override
+    public String getSourceSystemFlowFileIdentifier() {
+        return sourceSystemFlowFileIdentifier;
+    }
+
+    @Override
+    public String getFlowFileUuid() {
+        return uuid;
+    }
+
+    @Override
+    public List<String> getParentUuids() {
+        return parentUuids == null ? Collections.<String>emptyList() : 
parentUuids;
+    }
+
+    @Override
+    public List<String> getChildUuids() {
+        return childrenUuids == null ? Collections.<String>emptyList() : 
childrenUuids;
+    }
+
+    @Override
+    public String getAlternateIdentifierUri() {
+        return alternateIdentifierUri;
+    }
+
+    @Override
+    public long getEventDuration() {
+        return eventDuration;
+    }
+
+    @Override
+    public String getDetails() {
+        return details;
+    }
+
+    @Override
+    public String getRelationship() {
+        return relationship;
+    }
+
+    @Override
+    public long getFlowFileEntryDate() {
+        return entryDate;
+    }
+
+    @Override
+    public String getContentClaimSection() {
+        return contentClaimSection;
+    }
+
+    @Override
+    public String getContentClaimContainer() {
+        return contentClaimContainer;
+    }
+
+    @Override
+    public String getContentClaimIdentifier() {
+        return contentClaimIdentifier;
+    }
+
+    @Override
+    public Long getContentClaimOffset() {
+        return contentClaimOffset;
+    }
+
+    @Override
+    public String getSourceQueueIdentifier() {
+        return sourceQueueIdentifier;
+    }
+
+    @Override
+    public Map<String, String> getPreviousAttributes() {
+        return previousAttributes;
+    }
+
+    @Override
+    public String getPreviousContentClaimContainer() {
+        return previousClaimContainer;
+    }
+
+    @Override
+    public String getPreviousContentClaimIdentifier() {
+        return previousClaimIdentifier;
+    }
+
+    @Override
+    public Long getPreviousContentClaimOffset() {
+        return previousClaimOffset;
+    }
+
+    @Override
+    public String getPreviousContentClaimSection() {
+        return previousClaimSection;
+    }
+
+    @Override
+    public Map<String, String> getUpdatedAttributes() {
+        return updatedAttributes;
+    }
+
+    @Override
+    public int hashCode() {
+        final int eventTypeCode;
+        if (eventType == ProvenanceEventType.CLONE || eventType == 
ProvenanceEventType.JOIN || eventType == ProvenanceEventType.FORK) {
+            eventTypeCode = 1472;
+        } else if (eventType == ProvenanceEventType.REPLAY) {
+            eventTypeCode = 21479 + (int) (0x7FFFFFFF & eventTime); // use 
lower bits of event time.
+        } else {
+            eventTypeCode = 4812 + eventType.hashCode() + 4 * uuid.hashCode();
+        }
+
+        return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 
41 * transitUri.hashCode())
+                + (relationship == null ? 0 : 47 * relationship.hashCode()) + 
44 * eventTypeCode;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof StandardProvenanceEventRecord)) {
+            return false;
+        }
+
+        final StandardProvenanceEventRecord other = 
(StandardProvenanceEventRecord) obj;
+        // If event ID's are populated and not equal, return false. If they 
have not yet been populated, do not
+        // use them in the comparison.
+        if (eventId > 0L && other.getEventId() > 0L && eventId != 
other.getEventId()) {
+            return false;
+        }
+        if (eventType != other.eventType) {
+            return false;
+        }
+
+        if (!componentId.equals(other.componentId)) {
+            return false;
+        }
+
+        if (different(parentUuids, other.parentUuids)) {
+            return false;
+        }
+
+        if (different(childrenUuids, other.childrenUuids)) {
+            return false;
+        }
+
+        // SPAWN had issues indicating which should be the event's 
FlowFileUUID in the case that there is 1 parent and 1 child.
+        if (!uuid.equals(other.uuid)) {
+            return false;
+        }
+
+        if (different(transitUri, other.transitUri)) {
+            return false;
+        }
+
+        if (different(relationship, other.relationship)) {
+            return false;
+        }
+
+        return !(eventType == ProvenanceEventType.REPLAY && eventTime != 
other.getEventTime());
+    }
+
+    private boolean different(final Object a, final Object b) {
+        if (a == null && b == null) {
+            return false;
+        }
+        if (a == null || b == null) {
+            return true;
+        }
+
+        return !a.equals(b);
+    }
+
+    private boolean different(final List<String> a, final List<String> b) {
+        if (a == null && b == null) {
+            return false;
+        }
+
+        if (a == null && b != null) {
+            return true;
+        }
+
+        if (a != null && b == null) {
+            return true;
+        }
+
+        if (a.size() != b.size()) {
+            return true;
+        }
+
+        final List<String> sortedA = new ArrayList<>(a);
+        final List<String> sortedB = new ArrayList<>(b);
+
+        Collections.sort(sortedA);
+        Collections.sort(sortedB);
+
+        for (int i = 0; i < sortedA.size(); i++) {
+            if (!sortedA.get(i).equals(sortedB.get(i))) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "ProvenanceEventRecord ["
+                + "eventId=" + eventId
+                + ", eventType=" + eventType
+                + ", eventTime=" + new Date(eventTime)
+                + ", uuid=" + uuid
+                + ", fileSize=" + contentSize
+                + ", componentId=" + componentId
+                + ", transitUri=" + transitUri
+                + ", sourceSystemFlowFileIdentifier=" + 
sourceSystemFlowFileIdentifier
+                + ", parentUuids=" + parentUuids
+                + ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
+    }
+
+    public static class Builder implements ProvenanceEventBuilder {
+
+        private long eventTime = System.currentTimeMillis();
+        private long entryDate;
+        private long lineageStartDate;
+        private Set<String> lineageIdentifiers = new HashSet<>();
+        private ProvenanceEventType eventType = null;
+        private String componentId = null;
+        private String componentType = null;
+        private String sourceSystemFlowFileIdentifier = null;
+        private String transitUri = null;
+        private String uuid = null;
+        private List<String> parentUuids = null;
+        private List<String> childrenUuids = null;
+        private String contentType = null;
+        private String alternateIdentifierUri = null;
+        private String details = null;
+        private String relationship = null;
+        private long storageByteOffset = -1L;
+        private long eventDuration = -1L;
+        private String storageFilename;
+
+        private String contentClaimSection;
+        private String contentClaimContainer;
+        private String contentClaimIdentifier;
+        private Long contentClaimOffset;
+        private Long contentSize;
+
+        private String previousClaimSection;
+        private String previousClaimContainer;
+        private String previousClaimIdentifier;
+        private Long previousClaimOffset;
+        private Long previousSize;
+
+        private String sourceQueueIdentifier;
+
+        private Map<String, String> previousAttributes;
+        private Map<String, String> updatedAttributes;
+
+        @Override
+        public Builder fromEvent(final ProvenanceEventRecord event) {
+            eventTime = event.getEventTime();
+            entryDate = event.getFlowFileEntryDate();
+            lineageStartDate = event.getLineageStartDate();
+            lineageIdentifiers = event.getLineageIdentifiers();
+            eventType = event.getEventType();
+            componentId = event.getComponentId();
+            componentType = event.getComponentType();
+            transitUri = event.getTransitUri();
+            sourceSystemFlowFileIdentifier = 
event.getSourceSystemFlowFileIdentifier();
+            uuid = event.getFlowFileUuid();
+            parentUuids = event.getParentUuids();
+            childrenUuids = event.getChildUuids();
+            alternateIdentifierUri = event.getAlternateIdentifierUri();
+            eventDuration = event.getEventDuration();
+            previousAttributes = event.getPreviousAttributes();
+            updatedAttributes = event.getUpdatedAttributes();
+            details = event.getDetails();
+            relationship = event.getRelationship();
+
+            contentClaimSection = event.getContentClaimSection();
+            contentClaimContainer = event.getContentClaimContainer();
+            contentClaimIdentifier = event.getContentClaimIdentifier();
+            contentClaimOffset = event.getContentClaimOffset();
+            contentSize = event.getFileSize();
+
+            previousClaimSection = event.getPreviousContentClaimSection();
+            previousClaimContainer = event.getPreviousContentClaimContainer();
+            previousClaimIdentifier = 
event.getPreviousContentClaimIdentifier();
+            previousClaimOffset = event.getPreviousContentClaimOffset();
+            previousSize = event.getPreviousFileSize();
+
+            sourceQueueIdentifier = event.getSourceQueueIdentifier();
+
+            if (event instanceof StandardProvenanceEventRecord) {
+                final StandardProvenanceEventRecord standardProvEvent = 
(StandardProvenanceEventRecord) event;
+                storageByteOffset = standardProvEvent.storageByteOffset;
+                storageFilename = standardProvEvent.storageFilename;
+            }
+
+            return this;
+        }
+
+        @Override
+        public Builder setFlowFileEntryDate(final long entryDate) {
+            this.entryDate = entryDate;
+            return this;
+        }
+
+        @Override
+        public Builder setLineageIdentifiers(final Set<String> 
lineageIdentifiers) {
+            this.lineageIdentifiers = lineageIdentifiers;
+            return this;
+        }
+
+        @Override
+        public Builder setAttributes(final Map<String, String> 
previousAttributes, final Map<String, String> updatedAttributes) {
+            this.previousAttributes = previousAttributes;
+            this.updatedAttributes = updatedAttributes;
+            return this;
+        }
+
+        @Override
+        public Builder setFlowFileUUID(final String uuid) {
+            this.uuid = uuid;
+            return this;
+        }
+
+        public Builder setStorageLocation(final String filename, final long 
offset) {
+            this.storageFilename = filename;
+            this.storageByteOffset = offset;
+            return this;
+        }
+
+        @Override
+        public Builder setEventTime(long eventTime) {
+            this.eventTime = eventTime;
+            return this;
+        }
+
+        @Override
+        public Builder setEventDuration(final long millis) {
+            this.eventDuration = millis;
+            return this;
+        }
+
+        @Override
+        public Builder setLineageStartDate(final long startDate) {
+            this.lineageStartDate = startDate;
+            return this;
+        }
+
+        public Builder addLineageIdentifier(final String lineageIdentifier) {
+            this.lineageIdentifiers.add(lineageIdentifier);
+            return this;
+        }
+
+        @Override
+        public Builder setEventType(ProvenanceEventType eventType) {
+            this.eventType = eventType;
+            return this;
+        }
+
+        @Override
+        public Builder setComponentId(String componentId) {
+            this.componentId = componentId;
+            return this;
+        }
+
+        @Override
+        public Builder setComponentType(String componentType) {
+            this.componentType = componentType;
+            return this;
+        }
+
+        @Override
+        public Builder setSourceSystemFlowFileIdentifier(String 
sourceSystemFlowFileIdentifier) {
+            this.sourceSystemFlowFileIdentifier = 
sourceSystemFlowFileIdentifier;
+            return this;
+        }
+
+        @Override
+        public Builder setTransitUri(String transitUri) {
+            this.transitUri = transitUri;
+            return this;
+        }
+
+        @Override
+        public Builder addParentFlowFile(final FlowFile parentFlowFile) {
+            if (this.parentUuids == null) {
+                this.parentUuids = new ArrayList<>();
+            }
+            
this.parentUuids.add(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
+            return this;
+        }
+
+        public Builder addParentUuid(final String uuid) {
+            if (this.parentUuids == null) {
+                this.parentUuids = new ArrayList<>();
+            }
+            this.parentUuids.add(uuid);
+            return this;
+        }
+
+        @Override
+        public Builder removeParentFlowFile(final FlowFile parentFlowFile) {
+            if (this.parentUuids == null) {
+                return this;
+            }
+
+            
parentUuids.remove(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
+            return this;
+        }
+
+        @Override
+        public Builder addChildFlowFile(final FlowFile childFlowFile) {
+            if (this.childrenUuids == null) {
+                this.childrenUuids = new ArrayList<>();
+            }
+            
this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
+            return this;
+        }
+
+        public Builder addChildUuid(final String uuid) {
+            if (this.childrenUuids == null) {
+                this.childrenUuids = new ArrayList<>();
+            }
+            this.childrenUuids.add(uuid);
+            return this;
+        }
+
+        @Override
+        public Builder removeChildFlowFile(final FlowFile childFlowFile) {
+            if (this.childrenUuids == null) {
+                return this;
+            }
+
+            
childrenUuids.remove(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
+            return this;
+        }
+
+        public Builder setContentType(String contentType) {
+            this.contentType = contentType;
+            return this;
+        }
+
+        @Override
+        public Builder setAlternateIdentifierUri(String 
alternateIdentifierUri) {
+            this.alternateIdentifierUri = alternateIdentifierUri;
+            return this;
+        }
+
+        @Override
+        public Builder setDetails(String details) {
+            this.details = details;
+            return this;
+        }
+
+        @Override
+        public Builder setRelationship(Relationship relationship) {
+            this.relationship = relationship.getName();
+            return this;
+        }
+
+        public Builder setRelationship(final String relationship) {
+            this.relationship = relationship;
+            return this;
+        }
+
+        @Override
+        public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) {
+            setFlowFileEntryDate(flowFile.getEntryDate());
+            setLineageIdentifiers(flowFile.getLineageIdentifiers());
+            setLineageStartDate(flowFile.getLineageStartDate());
+            setAttributes(Collections.<String, String>emptyMap(), 
flowFile.getAttributes());
+            uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+            this.contentSize = flowFile.getSize();
+            return this;
+        }
+
+        @Override
+        public Builder setPreviousContentClaim(final String container, final 
String section, final String identifier, final Long offset, final long size) {
+            previousClaimSection = section;
+            previousClaimContainer = container;
+            previousClaimIdentifier = identifier;
+            previousClaimOffset = offset;
+            previousSize = size;
+            return this;
+        }
+
+        @Override
+        public Builder setCurrentContentClaim(final String container, final 
String section, final String identifier, final Long offset, final long size) {
+            contentClaimSection = section;
+            contentClaimContainer = container;
+            contentClaimIdentifier = identifier;
+            contentClaimOffset = offset;
+            contentSize = size;
+            return this;
+        }
+
+        @Override
+        public Builder setSourceQueueIdentifier(final String identifier) {
+            sourceQueueIdentifier = identifier;
+            return this;
+        }
+
+        private void assertSet(final Object value, final String name) {
+            if (value == null) {
+                throw new IllegalStateException("Cannot create Provenance 
Event Record because " + name + " is not set");
+            }
+        }
+
+        public ProvenanceEventType getEventType() {
+            return eventType;
+        }
+
+        public List<String> getChildUuids() {
+            return Collections.unmodifiableList(childrenUuids);
+        }
+
+        public List<String> getParentUuids() {
+            return Collections.unmodifiableList(parentUuids);
+        }
+
+        @Override
+        public StandardProvenanceEventRecord build() {
+            assertSet(eventType, "Event Type");
+            assertSet(componentId, "Component ID");
+            assertSet(componentType, "Component Type");
+            assertSet(uuid, "FlowFile UUID");
+            assertSet(contentSize, "FlowFile Size");
+
+            switch (eventType) {
+                case ADDINFO:
+                    if (alternateIdentifierUri == null) {
+                        throw new IllegalStateException("Cannot create 
Provenance Event Record of type " + eventType + " because no alternate 
identifiers have been set");
+                    }
+                    break;
+                case RECEIVE:
+                case SEND:
+                    assertSet(transitUri, "Transit URI");
+                    break;
+                case ROUTE:
+                    assertSet(relationship, "Relationship");
+                    break;
+                case CLONE:
+                case FORK:
+                case JOIN:
+                    if ((parentUuids == null || parentUuids.isEmpty()) && 
(childrenUuids == null || childrenUuids.isEmpty())) {
+                        throw new IllegalStateException("Cannot create 
Provenance Event Record of type " + eventType + " because no Parent UUIDs or 
Children UUIDs have been set");
+                    }
+                    break;
+                default:
+                    break;
+            }
+
+            return new StandardProvenanceEventRecord(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
----------------------------------------------------------------------
diff --git 
a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
new file mode 100644
index 0000000..9a9a27d
--- /dev/null
+++ 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QueryResult;
+
+public class StandardQueryResult implements QueryResult {
+
+    public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, 
TimeUnit.MINUTES);
+    private final Query query;
+    private final long creationNanos;
+
+    private final int numSteps;
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+
+    private final Lock writeLock = rwLock.writeLock();
+    // guarded by writeLock
+    private final List<ProvenanceEventRecord> matchingRecords = new 
ArrayList<>();
+    private long totalHitCount;
+    private int numCompletedSteps = 0;
+    private Date expirationDate;
+    private String error;
+    private long queryTime;
+
+    private volatile boolean canceled = false;
+
+    public StandardQueryResult(final Query query, final int numSteps) {
+        this.query = query;
+        this.numSteps = numSteps;
+        this.creationNanos = System.nanoTime();
+
+        updateExpiration();
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getMatchingEvents() {
+        readLock.lock();
+        try {
+            if (matchingRecords.size() <= query.getMaxResults()) {
+                return new ArrayList<>(matchingRecords);
+            }
+
+            final List<ProvenanceEventRecord> copy = new 
ArrayList<>(query.getMaxResults());
+            for (int i = 0; i < query.getMaxResults(); i++) {
+                copy.add(matchingRecords.get(i));
+            }
+
+            return copy;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public long getTotalHitCount() {
+        readLock.lock();
+        try {
+            return totalHitCount;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public long getQueryTime() {
+        return queryTime;
+    }
+
+    @Override
+    public Date getExpiration() {
+        return expirationDate;
+    }
+
+    @Override
+    public String getError() {
+        return error;
+    }
+
+    @Override
+    public int getPercentComplete() {
+        readLock.lock();
+        try {
+            return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / 
(float) numSteps) * 100.0F);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isFinished() {
+        readLock.lock();
+        try {
+            return numCompletedSteps >= numSteps || canceled;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    void cancel() {
+        this.canceled = true;
+    }
+
+    public void setError(final String error) {
+        writeLock.lock();
+        try {
+            this.error = error;
+            numCompletedSteps++;
+
+            updateExpiration();
+            if (numCompletedSteps >= numSteps) {
+                final long searchNanos = System.nanoTime() - creationNanos;
+                queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, 
TimeUnit.NANOSECONDS);
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public void update(final Collection<ProvenanceEventRecord> 
matchingRecords, final long totalHits) {
+        writeLock.lock();
+        try {
+            this.matchingRecords.addAll(matchingRecords);
+            this.totalHitCount += totalHits;
+
+            numCompletedSteps++;
+            updateExpiration();
+
+            if (numCompletedSteps >= numSteps) {
+                final long searchNanos = System.nanoTime() - creationNanos;
+                queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, 
TimeUnit.NANOSECONDS);
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * Must be called with write lock!
+     */
+    private void updateExpiration() {
+        expirationDate = new Date(System.currentTimeMillis() + TTL);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
----------------------------------------------------------------------
diff --git 
a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
new file mode 100644
index 0000000..0aaf5ef
--- /dev/null
+++ 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lineage;
+
+import static java.util.Objects.requireNonNull;
+
+public class EdgeNode implements LineageEdge {
+
+    private final String uuid;
+    private final LineageNode source;
+    private final LineageNode destination;
+
+    public EdgeNode(final String uuid, final LineageNode source, final 
LineageNode destination) {
+        this.uuid = uuid;
+        this.source = requireNonNull(source);
+        this.destination = requireNonNull(destination);
+    }
+
+    @Override
+    public String getUuid() {
+        return uuid;
+    }
+
+    @Override
+    public LineageNode getSource() {
+        return source;
+    }
+
+    @Override
+    public LineageNode getDestination() {
+        return destination;
+    }
+
+    @Override
+    public int hashCode() {
+        return 43298293 + source.hashCode() + destination.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+
+        if (!(obj instanceof EdgeNode)) {
+            return false;
+        }
+
+        final EdgeNode other = (EdgeNode) obj;
+        return (source.equals(other.source) && 
destination.equals(other.destination));
+    }
+
+    @Override
+    public String toString() {
+        return "Edge[Source=" + source + ", Destination=" + destination + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
----------------------------------------------------------------------
diff --git 
a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
new file mode 100644
index 0000000..12d9a4f
--- /dev/null
+++ 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lineage;
+
+import java.util.List;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+public class EventNode implements ProvenanceEventLineageNode {
+
+    private final ProvenanceEventRecord record;
+    private String clusterNodeIdentifier = null;
+
+    public EventNode(final ProvenanceEventRecord event) {
+        this.record = event;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return String.valueOf(getEventIdentifier());
+    }
+
+    @Override
+    public String getClusterNodeIdentifier() {
+        return clusterNodeIdentifier;
+    }
+
+    public void setClusterNodeIdentifier(final String nodeIdentifier) {
+        this.clusterNodeIdentifier = nodeIdentifier;
+    }
+
+    @Override
+    public LineageNodeType getNodeType() {
+        return LineageNodeType.PROVENANCE_EVENT_NODE;
+    }
+
+    @Override
+    public ProvenanceEventType getEventType() {
+        return record.getEventType();
+    }
+
+    @Override
+    public long getTimestamp() {
+        return record.getEventTime();
+    }
+
+    @Override
+    public long getEventIdentifier() {
+        return record.getEventId();
+    }
+
+    @Override
+    public String getFlowFileUuid() {
+        return record.getAttributes().get(CoreAttributes.UUID.key());
+    }
+
+    @Override
+    public List<String> getParentUuids() {
+        return record.getParentUuids();
+    }
+
+    @Override
+    public List<String> getChildUuids() {
+        return record.getChildUuids();
+    }
+
+    @Override
+    public int hashCode() {
+        return 2938472 + record.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (this == obj) {
+            return true;
+        }
+
+        if (!(obj instanceof EventNode)) {
+            return false;
+        }
+
+        final EventNode other = (EventNode) obj;
+        return record.equals(other.record);
+    }
+
+    @Override
+    public String toString() {
+        return "Event[ID=" + record.getEventId() + ", Type=" + 
record.getEventType() + ", UUID=" + record.getFlowFileUuid() + ", Component=" + 
record.getComponentId() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
----------------------------------------------------------------------
diff --git 
a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
new file mode 100644
index 0000000..c36c38d
--- /dev/null
+++ 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lineage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class FlowFileLineage implements Lineage {
+
+    private final List<LineageNode> nodes;
+    private final List<LineageEdge> edges;
+
+    public FlowFileLineage(final Collection<LineageNode> nodes, final 
Collection<LineageEdge> edges) {
+        this.nodes = new ArrayList<>(requireNonNull(nodes));
+        this.edges = new ArrayList<>(requireNonNull(edges));
+    }
+
+    @Override
+    public List<LineageNode> getNodes() {
+        return nodes;
+    }
+
+    @Override
+    public List<LineageEdge> getEdges() {
+        return edges;
+    }
+
+    @Override
+    public int hashCode() {
+        int sum = 923;
+        for (final LineageNode node : nodes) {
+            sum += node.hashCode();
+        }
+
+        for (final LineageEdge edge : edges) {
+            sum += edge.hashCode();
+        }
+
+        return sum;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+
+        if (obj == this) {
+            return true;
+        }
+
+        if (!(obj instanceof FlowFileLineage)) {
+            return false;
+        }
+
+        final FlowFileLineage other = (FlowFileLineage) obj;
+        return nodes.equals(other.nodes) && edges.equals(other.edges);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
----------------------------------------------------------------------
diff --git 
a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
new file mode 100644
index 0000000..fdc7470
--- /dev/null
+++ 
b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lineage;
+
+import static java.util.Objects.requireNonNull;
+
+public class FlowFileNode implements LineageNode {
+
+    private final String flowFileUuid;
+    private final long creationTime;
+    private String clusterNodeIdentifier;
+
+    public FlowFileNode(final String flowFileUuid, final long 
flowFileCreationTime) {
+        this.flowFileUuid = requireNonNull(flowFileUuid);
+        this.creationTime = flowFileCreationTime;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return flowFileUuid;
+    }
+
+    @Override
+    public long getTimestamp() {
+        return creationTime;
+    }
+
+    @Override
+    public String getClusterNodeIdentifier() {
+        return clusterNodeIdentifier;
+    }
+
+    @Override
+    public LineageNodeType getNodeType() {
+        return LineageNodeType.FLOWFILE_NODE;
+    }
+
+    @Override
+    public String getFlowFileUuid() {
+        return flowFileUuid;
+    }
+
+    @Override
+    public int hashCode() {
+        return 23498723 + flowFileUuid.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+
+        if (!(obj instanceof FlowFileNode)) {
+            return false;
+        }
+
+        final FlowFileNode other = (FlowFileNode) obj;
+        return flowFileUuid.equals(other.flowFileUuid);
+    }
+
+    @Override
+    public String toString() {
+        return "FlowFile[UUID=" + flowFileUuid + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/pom.xml
----------------------------------------------------------------------
diff --git a/commons/flowfile-packager/pom.xml 
b/commons/flowfile-packager/pom.xml
new file mode 100644
index 0000000..f4001fe
--- /dev/null
+++ b/commons/flowfile-packager/pom.xml
@@ -0,0 +1,43 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flowfile-packager</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>FlowFile Packager</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+            <version>1.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.3.2</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
----------------------------------------------------------------------
diff --git 
a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
new file mode 100644
index 0000000..ae16f99
--- /dev/null
+++ 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+public interface FlowFilePackager {
+
+    void packageFlowFile(InputStream in, OutputStream out, Map<String, String> 
attributes, long fileSize) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
----------------------------------------------------------------------
diff --git 
a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
new file mode 100644
index 0000000..2437279
--- /dev/null
+++ 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.lang3.StringEscapeUtils;
+
+public class FlowFilePackagerV1 implements FlowFilePackager {
+
+    public static final String FILENAME_ATTRIBUTES = "flowfile.attributes";
+    public static final String FILENAME_CONTENT = "flowfile.content";
+    public static final int DEFAULT_TAR_PERMISSIONS = 0644;
+
+    private final int tarPermissions;
+
+    public FlowFilePackagerV1() {
+        this(DEFAULT_TAR_PERMISSIONS);
+    }
+
+    public FlowFilePackagerV1(final int tarPermissions) {
+        this.tarPermissions = tarPermissions;
+    }
+
+    @Override
+    public void packageFlowFile(final InputStream in, final OutputStream out, 
final Map<String, String> attributes, final long fileSize) throws IOException {
+        try (final TarArchiveOutputStream tout = new 
TarArchiveOutputStream(out)) {
+            writeAttributesEntry(attributes, tout);
+            writeContentEntry(tout, in, fileSize);
+            tout.finish();
+            tout.flush();
+            tout.close();
+        }
+    }
+
+    private void writeAttributesEntry(final Map<String, String> attributes, 
final TarArchiveOutputStream tout) throws IOException {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE 
properties\n  SYSTEM \"http://java.sun.com/dtd/properties.dtd\";>\n");
+        sb.append("<properties>");
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+            final String escapedKey = 
StringEscapeUtils.escapeXml(entry.getKey());
+            final String escapedValue = 
StringEscapeUtils.escapeXml(entry.getValue());
+            sb.append("\n  <entry 
key=\"").append(escapedKey).append("\">").append(escapedValue).append("</entry>");
+        }
+        sb.append("</properties>");
+
+        final byte[] metaBytes = 
sb.toString().getBytes(StandardCharsets.UTF_8);
+        final TarArchiveEntry attribEntry = new 
TarArchiveEntry(FILENAME_ATTRIBUTES);
+        attribEntry.setMode(tarPermissions);
+        attribEntry.setSize(metaBytes.length);
+        tout.putArchiveEntry(attribEntry);
+        tout.write(metaBytes);
+        tout.closeArchiveEntry();
+    }
+
+    private void writeContentEntry(final TarArchiveOutputStream tarOut, final 
InputStream inStream, final long fileSize) throws IOException {
+        final TarArchiveEntry entry = new TarArchiveEntry(FILENAME_CONTENT);
+        entry.setMode(tarPermissions);
+        entry.setSize(fileSize);
+        tarOut.putArchiveEntry(entry);
+        final byte[] buffer = new byte[512 << 10];//512KB            
+        int bytesRead = 0;
+        while ((bytesRead = inStream.read(buffer)) != -1) { //still more data 
to read
+            if (bytesRead > 0) {
+                tarOut.write(buffer, 0, bytesRead);
+            }
+        }
+
+        copy(inStream, tarOut);
+        tarOut.closeArchiveEntry();
+    }
+
+    public static long copy(final InputStream source, final OutputStream 
destination) throws IOException {
+        final byte[] buffer = new byte[8192];
+        int len;
+        long totalCount = 0L;
+        while ((len = source.read(buffer)) > 0) {
+            destination.write(buffer, 0, len);
+            totalCount += len;
+        }
+        return totalCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
----------------------------------------------------------------------
diff --git 
a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
new file mode 100644
index 0000000..6f9d6b1
--- /dev/null
+++ 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * <p>
+ * Packages a FlowFile, including both its content and its attributes into a
+ * single file that is stream-friendly. The encoding scheme is as such:
+ * </p>
+ *
+ * <pre>
+ * Length Field : indicates the number of Flow File Attributes in the stream
+ * 1 to N times (N=number of Flow File Attributes):
+ *      String Field : Flow File Attribute key name
+ *      String Field : Flow File Attribute value
+ * Long : 8 bytes indicating the length of the Flow File content
+ * Content : The next M bytes are the content of the Flow File.
+ * </pre>
+ *
+ * <pre>
+ * Encoding of String Field is as follows:
+ *      Length Field : indicates the length of the String
+ *      1 to N bytes (N=String length, determined by previous field, as 
described above) : The UTF-8 encoded string value.
+ * </pre>
+ *
+ * <pre>
+ * Encoding of Length Field is as follows:
+ *      First 2 bytes: Indicate length. If both bytes = 255, this is a special 
value indicating that the length is
+ *                     greater than or equal to 65536 bytes; therefore, the 
next 4 bytes will indicate the actual length.
+ * </pre>
+ *
+ * <p>
+ * Note: All byte-order encoding is Network Byte Order (Most Significant Byte
+ * first)
+ * </p>
+ *
+ * <p>
+ * The following example shows the bytes expected if we were to encode a
+ * FlowFile containing the following attributes where the content is the text
+ * "Hello World!":
+ *
+ * <br><br>
+ * Attributes:
+ * <pre>
+ * +-------+-------+
+ * | Key   + Value |
+ * + --------------+
+ * | A     | a     |
+ * + --------------+
+ * | B     | b     |
+ * + --------------+
+ * </pre> Content:<br>
+ * Hello World!
+ * <br><br>
+ * Packaged Byte Encoding (In Hexadecimal Form):
+ * <p>
+ *
+ * <pre>
+ * 00 02 00 01 41 00 01 61
+ * 00 01 42 00 01 62 00 00
+ * 00 00 00 00 00 0C 48 65
+ * 6C 6C 6F 20 57 6F 72 6C
+ * 64 21
+ * </pre>
+ */
+public class FlowFilePackagerV2 implements FlowFilePackager {
+
+    private static final int MAX_VALUE_2_BYTES = 65535;
+    private final byte[] writeBuffer = new byte[8];
+
+    @Override
+    public void packageFlowFile(final InputStream in, final OutputStream out, 
final Map<String, String> attributes, final long fileSize) throws IOException {
+        writeFieldLength(out, attributes.size()); //write out the number of 
attributes
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) { 
//write out each attribute key/value pair
+            writeString(entry.getKey(), out);
+            writeString(entry.getValue(), out);
+        }
+        writeLong(out, fileSize);//write out length of data
+        copy(in, out);//write out the actual flow file payload
+    }
+
+    private void copy(final InputStream in, final OutputStream out) throws 
IOException {
+        final byte[] buffer = new byte[65536];
+        int len;
+        while ((len = in.read(buffer)) > 0) {
+            out.write(buffer, 0, len);
+        }
+    }
+
+    private void writeString(final String val, final OutputStream out) throws 
IOException {
+        final byte[] bytes = val.getBytes("UTF-8");
+        writeFieldLength(out, bytes.length);
+        out.write(bytes);
+    }
+
+    private void writeFieldLength(final OutputStream out, final int numBytes) 
throws IOException {
+        // If the value is less than the max value that can be fit into 2 
bytes, just use the
+        // actual value. Otherwise, we will set the first 2 bytes to 255/255 
and then use the next
+        // 4 bytes to indicate the real length.
+        if (numBytes < MAX_VALUE_2_BYTES) {
+            writeBuffer[0] = (byte) (numBytes >>> 8);
+            writeBuffer[1] = (byte) (numBytes);
+            out.write(writeBuffer, 0, 2);
+        } else {
+            writeBuffer[0] = (byte) 0xff;
+            writeBuffer[1] = (byte) 0xff;
+            writeBuffer[2] = (byte) (numBytes >>> 24);
+            writeBuffer[3] = (byte) (numBytes >>> 16);
+            writeBuffer[4] = (byte) (numBytes >>> 8);
+            writeBuffer[5] = (byte) (numBytes);
+            out.write(writeBuffer, 0, 6);
+        }
+    }
+
+    private void writeLong(final OutputStream out, final long val) throws 
IOException {
+        writeBuffer[0] = (byte) (val >>> 56);
+        writeBuffer[1] = (byte) (val >>> 48);
+        writeBuffer[2] = (byte) (val >>> 40);
+        writeBuffer[3] = (byte) (val >>> 32);
+        writeBuffer[4] = (byte) (val >>> 24);
+        writeBuffer[5] = (byte) (val >>> 16);
+        writeBuffer[6] = (byte) (val >>> 8);
+        writeBuffer[7] = (byte) (val);
+        out.write(writeBuffer, 0, 8);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
----------------------------------------------------------------------
diff --git 
a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
new file mode 100644
index 0000000..181f3e3
--- /dev/null
+++ 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+public class FlowFilePackagerV3 implements FlowFilePackager {
+
+    public static final byte[] MAGIC_HEADER = {'N', 'i', 'F', 'i', 'F', 'F', 
'3'};
+    private static final int MAX_VALUE_2_BYTES = 65535;
+    private final byte[] writeBuffer = new byte[8];
+
+    @Override
+    public void packageFlowFile(final InputStream in, final OutputStream out, 
final Map<String, String> attributes, final long fileSize) throws IOException {
+        out.write(MAGIC_HEADER);
+
+        if (attributes == null) {
+            writeFieldLength(out, 0);
+        } else {
+            writeFieldLength(out, attributes.size()); //write out the number 
of attributes
+            for (final Map.Entry<String, String> entry : 
attributes.entrySet()) { //write out each attribute key/value pair
+                writeString(entry.getKey(), out);
+                writeString(entry.getValue(), out);
+            }
+        }
+
+        writeLong(out, fileSize);//write out length of data
+        copy(in, out);//write out the actual flow file payload
+    }
+
+    private void copy(final InputStream in, final OutputStream out) throws 
IOException {
+        final byte[] buffer = new byte[65536];
+        int len;
+        while ((len = in.read(buffer)) > 0) {
+            out.write(buffer, 0, len);
+        }
+    }
+
+    private void writeString(final String val, final OutputStream out) throws 
IOException {
+        final byte[] bytes = val.getBytes("UTF-8");
+        writeFieldLength(out, bytes.length);
+        out.write(bytes);
+    }
+
+    private void writeFieldLength(final OutputStream out, final int numBytes) 
throws IOException {
+        // If the value is less than the max value that can be fit into 2 
bytes, just use the
+        // actual value. Otherwise, we will set the first 2 bytes to 255/255 
and then use the next
+        // 4 bytes to indicate the real length.
+        if (numBytes < MAX_VALUE_2_BYTES) {
+            writeBuffer[0] = (byte) (numBytes >>> 8);
+            writeBuffer[1] = (byte) (numBytes);
+            out.write(writeBuffer, 0, 2);
+        } else {
+            writeBuffer[0] = (byte) 0xff;
+            writeBuffer[1] = (byte) 0xff;
+            writeBuffer[2] = (byte) (numBytes >>> 24);
+            writeBuffer[3] = (byte) (numBytes >>> 16);
+            writeBuffer[4] = (byte) (numBytes >>> 8);
+            writeBuffer[5] = (byte) (numBytes);
+            out.write(writeBuffer, 0, 6);
+        }
+    }
+
+    private void writeLong(final OutputStream out, final long val) throws 
IOException {
+        writeBuffer[0] = (byte) (val >>> 56);
+        writeBuffer[1] = (byte) (val >>> 48);
+        writeBuffer[2] = (byte) (val >>> 40);
+        writeBuffer[3] = (byte) (val >>> 32);
+        writeBuffer[4] = (byte) (val >>> 24);
+        writeBuffer[5] = (byte) (val >>> 16);
+        writeBuffer[6] = (byte) (val >>> 8);
+        writeBuffer[7] = (byte) (val);
+        out.write(writeBuffer, 0, 8);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
----------------------------------------------------------------------
diff --git 
a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
new file mode 100644
index 0000000..fd9d92d
--- /dev/null
+++ 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+public interface FlowFileUnpackager {
+
+    Map<String, String> unpackageFlowFile(InputStream in, OutputStream out) 
throws IOException;
+
+    boolean hasMoreData() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
----------------------------------------------------------------------
diff --git 
a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
new file mode 100644
index 0000000..f8ef3d1
--- /dev/null
+++ 
b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+
+public class FlowFileUnpackagerV1 implements FlowFileUnpackager {
+
+    private int flowFilesRead = 0;
+
+    @Override
+    public Map<String, String> unpackageFlowFile(final InputStream in, final 
OutputStream out) throws IOException {
+        flowFilesRead++;
+        final TarArchiveInputStream tarIn = new TarArchiveInputStream(in);
+        final TarArchiveEntry attribEntry = tarIn.getNextTarEntry();
+        if (attribEntry == null) {
+            return null;
+        }
+
+        final Map<String, String> attributes;
+        if 
(attribEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) {
+            attributes = getAttributes(tarIn);
+        } else {
+            throw new IOException("Expected two tar entries: "
+                    + FlowFilePackagerV1.FILENAME_CONTENT + " and "
+                    + FlowFilePackagerV1.FILENAME_ATTRIBUTES);
+        }
+
+        final TarArchiveEntry contentEntry = tarIn.getNextTarEntry();
+
+        if (contentEntry != null && 
contentEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) {
+            final byte[] buffer = new byte[512 << 10];//512KB            
+            int bytesRead = 0;
+            while ((bytesRead = tarIn.read(buffer)) != -1) { //still more data 
to read
+                if (bytesRead > 0) {
+                    out.write(buffer, 0, bytesRead);
+                }
+            }
+            out.flush();
+        } else {
+            throw new IOException("Expected two tar entries: "
+                    + FlowFilePackagerV1.FILENAME_CONTENT + " and "
+                    + FlowFilePackagerV1.FILENAME_ATTRIBUTES);
+        }
+
+        return attributes;
+    }
+
+    protected Map<String, String> getAttributes(final TarArchiveInputStream 
stream) throws IOException {
+
+        final Properties props = new Properties();
+        props.loadFromXML(new NonCloseableInputStream(stream));
+
+        final Map<String, String> result = new HashMap<>();
+        for (final Entry<Object, Object> entry : props.entrySet()) {
+            final Object keyObject = entry.getKey();
+            final Object valueObject = entry.getValue();
+            if (!(keyObject instanceof String)) {
+                throw new IOException("Flow file attributes object contains 
key of type "
+                        + keyObject.getClass().getCanonicalName()
+                        + " but expected java.lang.String");
+            } else if (!(keyObject instanceof String)) {
+                throw new IOException("Flow file attributes object contains 
value of type "
+                        + keyObject.getClass().getCanonicalName()
+                        + " but expected java.lang.String");
+            }
+
+            final String key = (String) keyObject;
+            final String value = (String) valueObject;
+            result.put(key, value);
+        }
+
+        return result;
+    }
+
+    @Override
+    public boolean hasMoreData() throws IOException {
+        return flowFilesRead == 0;
+    }
+
+    public static final class NonCloseableInputStream extends InputStream {
+
+        final InputStream stream;
+
+        public NonCloseableInputStream(final InputStream stream) {
+            this.stream = stream;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public int read() throws IOException {
+            return stream.read();
+        }
+
+        @Override
+        public int available() throws IOException {
+            return stream.available();
+        }
+
+        @Override
+        public synchronized void mark(int readlimit) {
+            stream.mark(readlimit);
+        }
+
+        @Override
+        public synchronized void reset() throws IOException {
+            stream.reset();
+        }
+
+        @Override
+        public boolean markSupported() {
+            return stream.markSupported();
+        }
+
+        @Override
+        public long skip(long n) throws IOException {
+            return stream.skip(n);
+        }
+
+        @Override
+        public int read(byte b[], int off, int len) throws IOException {
+            return stream.read(b, off, len);
+        }
+
+        @Override
+        public int read(byte b[]) throws IOException {
+            return stream.read(b);
+        }
+    }
+}

Reply via email to