http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java ---------------------------------------------------------------------- diff --git a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java new file mode 100644 index 0000000..afb56e8 --- /dev/null +++ b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.EdgeNode; +import org.apache.nifi.provenance.lineage.EventNode; +import org.apache.nifi.provenance.lineage.FlowFileNode; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; + +/** + * + */ +public class StandardLineageResult implements ComputeLineageResult { + + public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); + private static final Logger logger = LoggerFactory.getLogger(StandardLineageResult.class); + + private final Collection<String> flowFileUuids; + private final Collection<ProvenanceEventRecord> relevantRecords = new ArrayList<>(); + private final Set<LineageNode> nodes = new HashSet<>(); + private final Set<LineageEdge> edges = new HashSet<>(); + private final int numSteps; + private final long creationNanos; + private long computationNanos; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private Date expirationDate = null; + private String error = null; + private int numCompletedSteps = 0; + + private volatile boolean canceled = false; + + public StandardLineageResult(final int numSteps, final Collection<String> flowFileUuids) { + this.numSteps = numSteps; + this.creationNanos = System.nanoTime(); + this.flowFileUuids = flowFileUuids; + + updateExpiration(); + } + + @Override + public List<LineageNode> getNodes() { + readLock.lock(); + try { + return new ArrayList<>(nodes); + } finally { + readLock.unlock(); + } + } + + @Override + public List<LineageEdge> getEdges() { + readLock.lock(); + try { + return new ArrayList<>(edges); + } finally { + readLock.unlock(); + } + } + + public int getNumberOfEdges() { + readLock.lock(); + try { + return edges.size(); + } finally { + readLock.unlock(); + } + } + + public int getNumberOfNodes() { + readLock.lock(); + try { + return nodes.size(); + } finally { + readLock.unlock(); + } + } + + public long getComputationTime(final TimeUnit timeUnit) { + readLock.lock(); + try { + return timeUnit.convert(computationNanos, TimeUnit.NANOSECONDS); + } finally { + readLock.unlock(); + } + } + + @Override + public Date getExpiration() { + readLock.lock(); + try { + return expirationDate; + } finally { + readLock.unlock(); + } + } + + @Override + public String getError() { + readLock.lock(); + try { + return error; + } finally { + readLock.unlock(); + } + } + + @Override + public int getPercentComplete() { + readLock.lock(); + try { + return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isFinished() { + readLock.lock(); + try { + return numCompletedSteps >= numSteps || canceled; + } finally { + readLock.unlock(); + } + } + + public void setError(final String error) { + writeLock.lock(); + try { + this.error = error; + numCompletedSteps++; + + updateExpiration(); + + if (numCompletedSteps >= numSteps) { + computationNanos = System.nanoTime() - creationNanos; + } + } finally { + writeLock.unlock(); + } + } + + public void update(final Collection<ProvenanceEventRecord> records) { + writeLock.lock(); + try { + relevantRecords.addAll(records); + + numCompletedSteps++; + updateExpiration(); + + if (numCompletedSteps >= numSteps && error == null) { + computeLineage(); + computationNanos = System.nanoTime() - creationNanos; + } + } finally { + writeLock.unlock(); + } + } + + /** + * Computes the lineage from the relevant Provenance Event Records. This + * method must be called with the write lock held and is only going to be + * useful after all of the records have been successfully obtained + */ + private void computeLineage() { + final long startNanos = System.nanoTime(); + + nodes.clear(); + edges.clear(); + + Map<String, LineageNode> lastEventMap = new HashMap<>(); // maps FlowFile UUID to last event for that FlowFile + final List<ProvenanceEventRecord> sortedRecords = new ArrayList<>(relevantRecords); + Collections.sort(sortedRecords, new Comparator<ProvenanceEventRecord>() { + @Override + public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) { + // Sort on Event Time, then Event ID. + final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime()); + if (eventTimeComparison == 0) { + return Long.compare(o1.getEventId(), o2.getEventId()); + } else { + return eventTimeComparison; + } + } + }); + + // convert the StandardProvenanceRecord objects into Lineage nodes (FlowFileNode, EventNodes). + for (final ProvenanceEventRecord record : sortedRecords) { + final LineageNode lineageNode = new EventNode(record); + final boolean added = nodes.add(lineageNode); + if (!added) { + logger.debug("Did not add {} because it already exists in the 'nodes' set", lineageNode); + } + + // Create an edge that connects this node to the previous node for the same FlowFile UUID. + final LineageNode lastNode = lastEventMap.get(record.getFlowFileUuid()); + if (lastNode != null) { + // We calculate the Edge UUID based on whether or not this event is a SPAWN. + // If this event is a SPAWN, then we want to use the previous node's UUID because a + // SPAWN Event's UUID is not necessarily what we want, since a SPAWN Event's UUID pertains to + // only one of (potentially) many UUIDs associated with the event. Otherwise, we know that + // the UUID of this record is appropriate, so we just use it. + final String edgeUuid; + + switch (record.getEventType()) { + case JOIN: + case CLONE: + case REPLAY: + edgeUuid = lastNode.getFlowFileUuid(); + break; + default: + edgeUuid = record.getFlowFileUuid(); + break; + } + + edges.add(new EdgeNode(edgeUuid, lastNode, lineageNode)); + } + + lastEventMap.put(record.getFlowFileUuid(), lineageNode); + + switch (record.getEventType()) { + case FORK: + case JOIN: + case REPLAY: + case CLONE: { + // For events that create FlowFile nodes, we need to create the FlowFile Nodes and associated Edges, as appropriate + for (final String childUuid : record.getChildUuids()) { + if (flowFileUuids.contains(childUuid)) { + final FlowFileNode childNode = new FlowFileNode(childUuid, record.getEventTime()); + final boolean isNewFlowFile = nodes.add(childNode); + if (!isNewFlowFile) { + final String msg = "Unable to generate Lineage Graph because multiple events were registered claiming to have generated the same FlowFile (UUID = " + childNode.getFlowFileUuid() + ")"; + logger.error(msg); + setError(msg); + return; + } + + edges.add(new EdgeNode(childNode.getFlowFileUuid(), lineageNode, childNode)); + lastEventMap.put(childUuid, childNode); + } + } + for (final String parentUuid : record.getParentUuids()) { + LineageNode lastNodeForParent = lastEventMap.get(parentUuid); + if (lastNodeForParent != null && !lastNodeForParent.equals(lineageNode)) { + edges.add(new EdgeNode(parentUuid, lastNodeForParent, lineageNode)); + } + + lastEventMap.put(parentUuid, lineageNode); + } + } + break; + case RECEIVE: + case CREATE: { + // for a receive event, we want to create a FlowFile Node that represents the FlowFile received + // and create an edge from the Receive Event to the FlowFile Node + final LineageNode flowFileNode = new FlowFileNode(record.getFlowFileUuid(), record.getEventTime()); + final boolean isNewFlowFile = nodes.add(flowFileNode); + if (!isNewFlowFile) { + final String msg = "Found cycle in graph. This indicates that multiple events were registered claiming to have generated the same FlowFile (UUID = " + flowFileNode.getFlowFileUuid() + ")"; + setError(msg); + logger.error(msg); + return; + } + edges.add(new EdgeNode(record.getFlowFileUuid(), lineageNode, flowFileNode)); + lastEventMap.put(record.getFlowFileUuid(), flowFileNode); + } + break; + default: + break; + } + } + + final long nanos = System.nanoTime() - startNanos; + logger.debug("Finished building lineage with {} nodes and {} edges in {} millis", nodes.size(), edges.size(), TimeUnit.NANOSECONDS.toMillis(nanos)); + } + + void cancel() { + this.canceled = true; + } + + /** + * Must be called with write lock! + */ + private void updateExpiration() { + expirationDate = new Date(System.currentTimeMillis() + TTL); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java ---------------------------------------------------------------------- diff --git a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java new file mode 100644 index 0000000..cfbae88 --- /dev/null +++ b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java @@ -0,0 +1,752 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.Relationship; + +/** + * Holder for provenance relevant information + * <p/> + * @author none + */ +public final class StandardProvenanceEventRecord implements ProvenanceEventRecord { + + private final long eventTime; + private final long entryDate; + private final ProvenanceEventType eventType; + private final long lineageStartDate; + private final Set<String> lineageIdentifiers; + private final String componentId; + private final String componentType; + private final String transitUri; + private final String sourceSystemFlowFileIdentifier; + private final String uuid; + private final List<String> parentUuids; + private final List<String> childrenUuids; + private final String alternateIdentifierUri; + private final String details; + private final String relationship; + private final long storageByteOffset; + private final String storageFilename; + private final long eventDuration; + + private final String contentClaimSection; + private final String contentClaimContainer; + private final String contentClaimIdentifier; + private final Long contentClaimOffset; + private final long contentSize; + + private final String previousClaimSection; + private final String previousClaimContainer; + private final String previousClaimIdentifier; + private final Long previousClaimOffset; + private final Long previousSize; + + private final String sourceQueueIdentifier; + + private final Map<String, String> previousAttributes; + private final Map<String, String> updatedAttributes; + + private volatile long eventId; + + private StandardProvenanceEventRecord(final Builder builder) { + this.eventTime = builder.eventTime; + this.entryDate = builder.entryDate; + this.eventType = builder.eventType; + this.componentId = builder.componentId; + this.componentType = builder.componentType; + this.transitUri = builder.transitUri; + this.sourceSystemFlowFileIdentifier = builder.sourceSystemFlowFileIdentifier; + this.uuid = builder.uuid; + this.parentUuids = builder.parentUuids; + this.childrenUuids = builder.childrenUuids; + this.alternateIdentifierUri = builder.alternateIdentifierUri; + this.details = builder.details; + this.relationship = builder.relationship; + this.storageByteOffset = builder.storageByteOffset; + this.storageFilename = builder.storageFilename; + this.eventDuration = builder.eventDuration; + this.lineageStartDate = builder.lineageStartDate; + this.lineageIdentifiers = Collections.unmodifiableSet(builder.lineageIdentifiers); + + previousClaimSection = builder.previousClaimSection; + previousClaimContainer = builder.previousClaimContainer; + previousClaimIdentifier = builder.previousClaimIdentifier; + previousClaimOffset = builder.previousClaimOffset; + previousSize = builder.previousSize; + + contentClaimSection = builder.contentClaimSection; + contentClaimContainer = builder.contentClaimContainer; + contentClaimIdentifier = builder.contentClaimIdentifier; + contentClaimOffset = builder.contentClaimOffset; + contentSize = builder.contentSize; + + previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes); + updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes); + + sourceQueueIdentifier = builder.sourceQueueIdentifier; + + } + + public String getStorageFilename() { + return storageFilename; + } + + public long getStorageByteOffset() { + return storageByteOffset; + } + + void setEventId(final long eventId) { + this.eventId = eventId; + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public long getEventTime() { + return eventTime; + } + + @Override + public Set<String> getLineageIdentifiers() { + return lineageIdentifiers; + } + + @Override + public long getLineageStartDate() { + return lineageStartDate; + } + + @Override + public long getFileSize() { + return contentSize; + } + + @Override + public Long getPreviousFileSize() { + return previousSize; + } + + @Override + public ProvenanceEventType getEventType() { + return eventType; + } + + @Override + public Map<String, String> getAttributes() { + final Map<String, String> allAttrs = new HashMap<>(previousAttributes.size() + updatedAttributes.size()); + allAttrs.putAll(previousAttributes); + for (final Map.Entry<String, String> entry : updatedAttributes.entrySet()) { + if (entry.getValue() != null) { + allAttrs.put(entry.getKey(), entry.getValue()); + } + } + return allAttrs; + } + + @Override + public String getComponentId() { + return componentId; + } + + @Override + public String getComponentType() { + return componentType; + } + + @Override + public String getTransitUri() { + return transitUri; + } + + @Override + public String getSourceSystemFlowFileIdentifier() { + return sourceSystemFlowFileIdentifier; + } + + @Override + public String getFlowFileUuid() { + return uuid; + } + + @Override + public List<String> getParentUuids() { + return parentUuids == null ? Collections.<String>emptyList() : parentUuids; + } + + @Override + public List<String> getChildUuids() { + return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids; + } + + @Override + public String getAlternateIdentifierUri() { + return alternateIdentifierUri; + } + + @Override + public long getEventDuration() { + return eventDuration; + } + + @Override + public String getDetails() { + return details; + } + + @Override + public String getRelationship() { + return relationship; + } + + @Override + public long getFlowFileEntryDate() { + return entryDate; + } + + @Override + public String getContentClaimSection() { + return contentClaimSection; + } + + @Override + public String getContentClaimContainer() { + return contentClaimContainer; + } + + @Override + public String getContentClaimIdentifier() { + return contentClaimIdentifier; + } + + @Override + public Long getContentClaimOffset() { + return contentClaimOffset; + } + + @Override + public String getSourceQueueIdentifier() { + return sourceQueueIdentifier; + } + + @Override + public Map<String, String> getPreviousAttributes() { + return previousAttributes; + } + + @Override + public String getPreviousContentClaimContainer() { + return previousClaimContainer; + } + + @Override + public String getPreviousContentClaimIdentifier() { + return previousClaimIdentifier; + } + + @Override + public Long getPreviousContentClaimOffset() { + return previousClaimOffset; + } + + @Override + public String getPreviousContentClaimSection() { + return previousClaimSection; + } + + @Override + public Map<String, String> getUpdatedAttributes() { + return updatedAttributes; + } + + @Override + public int hashCode() { + final int eventTypeCode; + if (eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.FORK) { + eventTypeCode = 1472; + } else if (eventType == ProvenanceEventType.REPLAY) { + eventTypeCode = 21479 + (int) (0x7FFFFFFF & eventTime); // use lower bits of event time. + } else { + eventTypeCode = 4812 + eventType.hashCode() + 4 * uuid.hashCode(); + } + + return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode()) + + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof StandardProvenanceEventRecord)) { + return false; + } + + final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj; + // If event ID's are populated and not equal, return false. If they have not yet been populated, do not + // use them in the comparison. + if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) { + return false; + } + if (eventType != other.eventType) { + return false; + } + + if (!componentId.equals(other.componentId)) { + return false; + } + + if (different(parentUuids, other.parentUuids)) { + return false; + } + + if (different(childrenUuids, other.childrenUuids)) { + return false; + } + + // SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child. + if (!uuid.equals(other.uuid)) { + return false; + } + + if (different(transitUri, other.transitUri)) { + return false; + } + + if (different(relationship, other.relationship)) { + return false; + } + + return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime()); + } + + private boolean different(final Object a, final Object b) { + if (a == null && b == null) { + return false; + } + if (a == null || b == null) { + return true; + } + + return !a.equals(b); + } + + private boolean different(final List<String> a, final List<String> b) { + if (a == null && b == null) { + return false; + } + + if (a == null && b != null) { + return true; + } + + if (a != null && b == null) { + return true; + } + + if (a.size() != b.size()) { + return true; + } + + final List<String> sortedA = new ArrayList<>(a); + final List<String> sortedB = new ArrayList<>(b); + + Collections.sort(sortedA); + Collections.sort(sortedB); + + for (int i = 0; i < sortedA.size(); i++) { + if (!sortedA.get(i).equals(sortedB.get(i))) { + return true; + } + } + + return false; + } + + @Override + public String toString() { + return "ProvenanceEventRecord [" + + "eventId=" + eventId + + ", eventType=" + eventType + + ", eventTime=" + new Date(eventTime) + + ", uuid=" + uuid + + ", fileSize=" + contentSize + + ", componentId=" + componentId + + ", transitUri=" + transitUri + + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier + + ", parentUuids=" + parentUuids + + ", alternateIdentifierUri=" + alternateIdentifierUri + "]"; + } + + public static class Builder implements ProvenanceEventBuilder { + + private long eventTime = System.currentTimeMillis(); + private long entryDate; + private long lineageStartDate; + private Set<String> lineageIdentifiers = new HashSet<>(); + private ProvenanceEventType eventType = null; + private String componentId = null; + private String componentType = null; + private String sourceSystemFlowFileIdentifier = null; + private String transitUri = null; + private String uuid = null; + private List<String> parentUuids = null; + private List<String> childrenUuids = null; + private String contentType = null; + private String alternateIdentifierUri = null; + private String details = null; + private String relationship = null; + private long storageByteOffset = -1L; + private long eventDuration = -1L; + private String storageFilename; + + private String contentClaimSection; + private String contentClaimContainer; + private String contentClaimIdentifier; + private Long contentClaimOffset; + private Long contentSize; + + private String previousClaimSection; + private String previousClaimContainer; + private String previousClaimIdentifier; + private Long previousClaimOffset; + private Long previousSize; + + private String sourceQueueIdentifier; + + private Map<String, String> previousAttributes; + private Map<String, String> updatedAttributes; + + @Override + public Builder fromEvent(final ProvenanceEventRecord event) { + eventTime = event.getEventTime(); + entryDate = event.getFlowFileEntryDate(); + lineageStartDate = event.getLineageStartDate(); + lineageIdentifiers = event.getLineageIdentifiers(); + eventType = event.getEventType(); + componentId = event.getComponentId(); + componentType = event.getComponentType(); + transitUri = event.getTransitUri(); + sourceSystemFlowFileIdentifier = event.getSourceSystemFlowFileIdentifier(); + uuid = event.getFlowFileUuid(); + parentUuids = event.getParentUuids(); + childrenUuids = event.getChildUuids(); + alternateIdentifierUri = event.getAlternateIdentifierUri(); + eventDuration = event.getEventDuration(); + previousAttributes = event.getPreviousAttributes(); + updatedAttributes = event.getUpdatedAttributes(); + details = event.getDetails(); + relationship = event.getRelationship(); + + contentClaimSection = event.getContentClaimSection(); + contentClaimContainer = event.getContentClaimContainer(); + contentClaimIdentifier = event.getContentClaimIdentifier(); + contentClaimOffset = event.getContentClaimOffset(); + contentSize = event.getFileSize(); + + previousClaimSection = event.getPreviousContentClaimSection(); + previousClaimContainer = event.getPreviousContentClaimContainer(); + previousClaimIdentifier = event.getPreviousContentClaimIdentifier(); + previousClaimOffset = event.getPreviousContentClaimOffset(); + previousSize = event.getPreviousFileSize(); + + sourceQueueIdentifier = event.getSourceQueueIdentifier(); + + if (event instanceof StandardProvenanceEventRecord) { + final StandardProvenanceEventRecord standardProvEvent = (StandardProvenanceEventRecord) event; + storageByteOffset = standardProvEvent.storageByteOffset; + storageFilename = standardProvEvent.storageFilename; + } + + return this; + } + + @Override + public Builder setFlowFileEntryDate(final long entryDate) { + this.entryDate = entryDate; + return this; + } + + @Override + public Builder setLineageIdentifiers(final Set<String> lineageIdentifiers) { + this.lineageIdentifiers = lineageIdentifiers; + return this; + } + + @Override + public Builder setAttributes(final Map<String, String> previousAttributes, final Map<String, String> updatedAttributes) { + this.previousAttributes = previousAttributes; + this.updatedAttributes = updatedAttributes; + return this; + } + + @Override + public Builder setFlowFileUUID(final String uuid) { + this.uuid = uuid; + return this; + } + + public Builder setStorageLocation(final String filename, final long offset) { + this.storageFilename = filename; + this.storageByteOffset = offset; + return this; + } + + @Override + public Builder setEventTime(long eventTime) { + this.eventTime = eventTime; + return this; + } + + @Override + public Builder setEventDuration(final long millis) { + this.eventDuration = millis; + return this; + } + + @Override + public Builder setLineageStartDate(final long startDate) { + this.lineageStartDate = startDate; + return this; + } + + public Builder addLineageIdentifier(final String lineageIdentifier) { + this.lineageIdentifiers.add(lineageIdentifier); + return this; + } + + @Override + public Builder setEventType(ProvenanceEventType eventType) { + this.eventType = eventType; + return this; + } + + @Override + public Builder setComponentId(String componentId) { + this.componentId = componentId; + return this; + } + + @Override + public Builder setComponentType(String componentType) { + this.componentType = componentType; + return this; + } + + @Override + public Builder setSourceSystemFlowFileIdentifier(String sourceSystemFlowFileIdentifier) { + this.sourceSystemFlowFileIdentifier = sourceSystemFlowFileIdentifier; + return this; + } + + @Override + public Builder setTransitUri(String transitUri) { + this.transitUri = transitUri; + return this; + } + + @Override + public Builder addParentFlowFile(final FlowFile parentFlowFile) { + if (this.parentUuids == null) { + this.parentUuids = new ArrayList<>(); + } + this.parentUuids.add(parentFlowFile.getAttribute(CoreAttributes.UUID.key())); + return this; + } + + public Builder addParentUuid(final String uuid) { + if (this.parentUuids == null) { + this.parentUuids = new ArrayList<>(); + } + this.parentUuids.add(uuid); + return this; + } + + @Override + public Builder removeParentFlowFile(final FlowFile parentFlowFile) { + if (this.parentUuids == null) { + return this; + } + + parentUuids.remove(parentFlowFile.getAttribute(CoreAttributes.UUID.key())); + return this; + } + + @Override + public Builder addChildFlowFile(final FlowFile childFlowFile) { + if (this.childrenUuids == null) { + this.childrenUuids = new ArrayList<>(); + } + this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key())); + return this; + } + + public Builder addChildUuid(final String uuid) { + if (this.childrenUuids == null) { + this.childrenUuids = new ArrayList<>(); + } + this.childrenUuids.add(uuid); + return this; + } + + @Override + public Builder removeChildFlowFile(final FlowFile childFlowFile) { + if (this.childrenUuids == null) { + return this; + } + + childrenUuids.remove(childFlowFile.getAttribute(CoreAttributes.UUID.key())); + return this; + } + + public Builder setContentType(String contentType) { + this.contentType = contentType; + return this; + } + + @Override + public Builder setAlternateIdentifierUri(String alternateIdentifierUri) { + this.alternateIdentifierUri = alternateIdentifierUri; + return this; + } + + @Override + public Builder setDetails(String details) { + this.details = details; + return this; + } + + @Override + public Builder setRelationship(Relationship relationship) { + this.relationship = relationship.getName(); + return this; + } + + public Builder setRelationship(final String relationship) { + this.relationship = relationship; + return this; + } + + @Override + public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) { + setFlowFileEntryDate(flowFile.getEntryDate()); + setLineageIdentifiers(flowFile.getLineageIdentifiers()); + setLineageStartDate(flowFile.getLineageStartDate()); + setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes()); + uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + this.contentSize = flowFile.getSize(); + return this; + } + + @Override + public Builder setPreviousContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) { + previousClaimSection = section; + previousClaimContainer = container; + previousClaimIdentifier = identifier; + previousClaimOffset = offset; + previousSize = size; + return this; + } + + @Override + public Builder setCurrentContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) { + contentClaimSection = section; + contentClaimContainer = container; + contentClaimIdentifier = identifier; + contentClaimOffset = offset; + contentSize = size; + return this; + } + + @Override + public Builder setSourceQueueIdentifier(final String identifier) { + sourceQueueIdentifier = identifier; + return this; + } + + private void assertSet(final Object value, final String name) { + if (value == null) { + throw new IllegalStateException("Cannot create Provenance Event Record because " + name + " is not set"); + } + } + + public ProvenanceEventType getEventType() { + return eventType; + } + + public List<String> getChildUuids() { + return Collections.unmodifiableList(childrenUuids); + } + + public List<String> getParentUuids() { + return Collections.unmodifiableList(parentUuids); + } + + @Override + public StandardProvenanceEventRecord build() { + assertSet(eventType, "Event Type"); + assertSet(componentId, "Component ID"); + assertSet(componentType, "Component Type"); + assertSet(uuid, "FlowFile UUID"); + assertSet(contentSize, "FlowFile Size"); + + switch (eventType) { + case ADDINFO: + if (alternateIdentifierUri == null) { + throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no alternate identifiers have been set"); + } + break; + case RECEIVE: + case SEND: + assertSet(transitUri, "Transit URI"); + break; + case ROUTE: + assertSet(relationship, "Relationship"); + break; + case CLONE: + case FORK: + case JOIN: + if ((parentUuids == null || parentUuids.isEmpty()) && (childrenUuids == null || childrenUuids.isEmpty())) { + throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no Parent UUIDs or Children UUIDs have been set"); + } + break; + default: + break; + } + + return new StandardProvenanceEventRecord(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java ---------------------------------------------------------------------- diff --git a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java new file mode 100644 index 0000000..9a9a27d --- /dev/null +++ b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QueryResult; + +public class StandardQueryResult implements QueryResult { + + public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); + private final Query query; + private final long creationNanos; + + private final int numSteps; + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + + private final Lock writeLock = rwLock.writeLock(); + // guarded by writeLock + private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>(); + private long totalHitCount; + private int numCompletedSteps = 0; + private Date expirationDate; + private String error; + private long queryTime; + + private volatile boolean canceled = false; + + public StandardQueryResult(final Query query, final int numSteps) { + this.query = query; + this.numSteps = numSteps; + this.creationNanos = System.nanoTime(); + + updateExpiration(); + } + + @Override + public List<ProvenanceEventRecord> getMatchingEvents() { + readLock.lock(); + try { + if (matchingRecords.size() <= query.getMaxResults()) { + return new ArrayList<>(matchingRecords); + } + + final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults()); + for (int i = 0; i < query.getMaxResults(); i++) { + copy.add(matchingRecords.get(i)); + } + + return copy; + } finally { + readLock.unlock(); + } + } + + @Override + public long getTotalHitCount() { + readLock.lock(); + try { + return totalHitCount; + } finally { + readLock.unlock(); + } + } + + @Override + public long getQueryTime() { + return queryTime; + } + + @Override + public Date getExpiration() { + return expirationDate; + } + + @Override + public String getError() { + return error; + } + + @Override + public int getPercentComplete() { + readLock.lock(); + try { + return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isFinished() { + readLock.lock(); + try { + return numCompletedSteps >= numSteps || canceled; + } finally { + readLock.unlock(); + } + } + + void cancel() { + this.canceled = true; + } + + public void setError(final String error) { + writeLock.lock(); + try { + this.error = error; + numCompletedSteps++; + + updateExpiration(); + if (numCompletedSteps >= numSteps) { + final long searchNanos = System.nanoTime() - creationNanos; + queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS); + } + } finally { + writeLock.unlock(); + } + } + + public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) { + writeLock.lock(); + try { + this.matchingRecords.addAll(matchingRecords); + this.totalHitCount += totalHits; + + numCompletedSteps++; + updateExpiration(); + + if (numCompletedSteps >= numSteps) { + final long searchNanos = System.nanoTime() - creationNanos; + queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Must be called with write lock! + */ + private void updateExpiration() { + expirationDate = new Date(System.currentTimeMillis() + TTL); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java ---------------------------------------------------------------------- diff --git a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java new file mode 100644 index 0000000..0aaf5ef --- /dev/null +++ b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import static java.util.Objects.requireNonNull; + +public class EdgeNode implements LineageEdge { + + private final String uuid; + private final LineageNode source; + private final LineageNode destination; + + public EdgeNode(final String uuid, final LineageNode source, final LineageNode destination) { + this.uuid = uuid; + this.source = requireNonNull(source); + this.destination = requireNonNull(destination); + } + + @Override + public String getUuid() { + return uuid; + } + + @Override + public LineageNode getSource() { + return source; + } + + @Override + public LineageNode getDestination() { + return destination; + } + + @Override + public int hashCode() { + return 43298293 + source.hashCode() + destination.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + + if (!(obj instanceof EdgeNode)) { + return false; + } + + final EdgeNode other = (EdgeNode) obj; + return (source.equals(other.source) && destination.equals(other.destination)); + } + + @Override + public String toString() { + return "Edge[Source=" + source + ", Destination=" + destination + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java ---------------------------------------------------------------------- diff --git a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java new file mode 100644 index 0000000..12d9a4f --- /dev/null +++ b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import java.util.List; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +public class EventNode implements ProvenanceEventLineageNode { + + private final ProvenanceEventRecord record; + private String clusterNodeIdentifier = null; + + public EventNode(final ProvenanceEventRecord event) { + this.record = event; + } + + @Override + public String getIdentifier() { + return String.valueOf(getEventIdentifier()); + } + + @Override + public String getClusterNodeIdentifier() { + return clusterNodeIdentifier; + } + + public void setClusterNodeIdentifier(final String nodeIdentifier) { + this.clusterNodeIdentifier = nodeIdentifier; + } + + @Override + public LineageNodeType getNodeType() { + return LineageNodeType.PROVENANCE_EVENT_NODE; + } + + @Override + public ProvenanceEventType getEventType() { + return record.getEventType(); + } + + @Override + public long getTimestamp() { + return record.getEventTime(); + } + + @Override + public long getEventIdentifier() { + return record.getEventId(); + } + + @Override + public String getFlowFileUuid() { + return record.getAttributes().get(CoreAttributes.UUID.key()); + } + + @Override + public List<String> getParentUuids() { + return record.getParentUuids(); + } + + @Override + public List<String> getChildUuids() { + return record.getChildUuids(); + } + + @Override + public int hashCode() { + return 2938472 + record.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (this == obj) { + return true; + } + + if (!(obj instanceof EventNode)) { + return false; + } + + final EventNode other = (EventNode) obj; + return record.equals(other.record); + } + + @Override + public String toString() { + return "Event[ID=" + record.getEventId() + ", Type=" + record.getEventType() + ", UUID=" + record.getFlowFileUuid() + ", Component=" + record.getComponentId() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java ---------------------------------------------------------------------- diff --git a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java new file mode 100644 index 0000000..c36c38d --- /dev/null +++ b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class FlowFileLineage implements Lineage { + + private final List<LineageNode> nodes; + private final List<LineageEdge> edges; + + public FlowFileLineage(final Collection<LineageNode> nodes, final Collection<LineageEdge> edges) { + this.nodes = new ArrayList<>(requireNonNull(nodes)); + this.edges = new ArrayList<>(requireNonNull(edges)); + } + + @Override + public List<LineageNode> getNodes() { + return nodes; + } + + @Override + public List<LineageEdge> getEdges() { + return edges; + } + + @Override + public int hashCode() { + int sum = 923; + for (final LineageNode node : nodes) { + sum += node.hashCode(); + } + + for (final LineageEdge edge : edges) { + sum += edge.hashCode(); + } + + return sum; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof FlowFileLineage)) { + return false; + } + + final FlowFileLineage other = (FlowFileLineage) obj; + return nodes.equals(other.nodes) && edges.equals(other.edges); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java ---------------------------------------------------------------------- diff --git a/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java new file mode 100644 index 0000000..fdc7470 --- /dev/null +++ b/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import static java.util.Objects.requireNonNull; + +public class FlowFileNode implements LineageNode { + + private final String flowFileUuid; + private final long creationTime; + private String clusterNodeIdentifier; + + public FlowFileNode(final String flowFileUuid, final long flowFileCreationTime) { + this.flowFileUuid = requireNonNull(flowFileUuid); + this.creationTime = flowFileCreationTime; + } + + @Override + public String getIdentifier() { + return flowFileUuid; + } + + @Override + public long getTimestamp() { + return creationTime; + } + + @Override + public String getClusterNodeIdentifier() { + return clusterNodeIdentifier; + } + + @Override + public LineageNodeType getNodeType() { + return LineageNodeType.FLOWFILE_NODE; + } + + @Override + public String getFlowFileUuid() { + return flowFileUuid; + } + + @Override + public int hashCode() { + return 23498723 + flowFileUuid.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + + if (!(obj instanceof FlowFileNode)) { + return false; + } + + final FlowFileNode other = (FlowFileNode) obj; + return flowFileUuid.equals(other.flowFileUuid); + } + + @Override + public String toString() { + return "FlowFile[UUID=" + flowFileUuid + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/pom.xml ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/pom.xml b/commons/flowfile-packager/pom.xml new file mode 100644 index 0000000..f4001fe --- /dev/null +++ b/commons/flowfile-packager/pom.xml @@ -0,0 +1,43 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>flowfile-packager</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>FlowFile Packager</name> + + <dependencies> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.9</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.3.2</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java new file mode 100644 index 0000000..ae16f99 --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +public interface FlowFilePackager { + + void packageFlowFile(InputStream in, OutputStream out, Map<String, String> attributes, long fileSize) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java new file mode 100644 index 0000000..2437279 --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.lang3.StringEscapeUtils; + +public class FlowFilePackagerV1 implements FlowFilePackager { + + public static final String FILENAME_ATTRIBUTES = "flowfile.attributes"; + public static final String FILENAME_CONTENT = "flowfile.content"; + public static final int DEFAULT_TAR_PERMISSIONS = 0644; + + private final int tarPermissions; + + public FlowFilePackagerV1() { + this(DEFAULT_TAR_PERMISSIONS); + } + + public FlowFilePackagerV1(final int tarPermissions) { + this.tarPermissions = tarPermissions; + } + + @Override + public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { + try (final TarArchiveOutputStream tout = new TarArchiveOutputStream(out)) { + writeAttributesEntry(attributes, tout); + writeContentEntry(tout, in, fileSize); + tout.finish(); + tout.flush(); + tout.close(); + } + } + + private void writeAttributesEntry(final Map<String, String> attributes, final TarArchiveOutputStream tout) throws IOException { + final StringBuilder sb = new StringBuilder(); + sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE properties\n SYSTEM \"http://java.sun.com/dtd/properties.dtd\">\n"); + sb.append("<properties>"); + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + final String escapedKey = StringEscapeUtils.escapeXml(entry.getKey()); + final String escapedValue = StringEscapeUtils.escapeXml(entry.getValue()); + sb.append("\n <entry key=\"").append(escapedKey).append("\">").append(escapedValue).append("</entry>"); + } + sb.append("</properties>"); + + final byte[] metaBytes = sb.toString().getBytes(StandardCharsets.UTF_8); + final TarArchiveEntry attribEntry = new TarArchiveEntry(FILENAME_ATTRIBUTES); + attribEntry.setMode(tarPermissions); + attribEntry.setSize(metaBytes.length); + tout.putArchiveEntry(attribEntry); + tout.write(metaBytes); + tout.closeArchiveEntry(); + } + + private void writeContentEntry(final TarArchiveOutputStream tarOut, final InputStream inStream, final long fileSize) throws IOException { + final TarArchiveEntry entry = new TarArchiveEntry(FILENAME_CONTENT); + entry.setMode(tarPermissions); + entry.setSize(fileSize); + tarOut.putArchiveEntry(entry); + final byte[] buffer = new byte[512 << 10];//512KB + int bytesRead = 0; + while ((bytesRead = inStream.read(buffer)) != -1) { //still more data to read + if (bytesRead > 0) { + tarOut.write(buffer, 0, bytesRead); + } + } + + copy(inStream, tarOut); + tarOut.closeArchiveEntry(); + } + + public static long copy(final InputStream source, final OutputStream destination) throws IOException { + final byte[] buffer = new byte[8192]; + int len; + long totalCount = 0L; + while ((len = source.read(buffer)) > 0) { + destination.write(buffer, 0, len); + totalCount += len; + } + return totalCount; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java new file mode 100644 index 0000000..6f9d6b1 --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +/** + * <p> + * Packages a FlowFile, including both its content and its attributes into a + * single file that is stream-friendly. The encoding scheme is as such: + * </p> + * + * <pre> + * Length Field : indicates the number of Flow File Attributes in the stream + * 1 to N times (N=number of Flow File Attributes): + * String Field : Flow File Attribute key name + * String Field : Flow File Attribute value + * Long : 8 bytes indicating the length of the Flow File content + * Content : The next M bytes are the content of the Flow File. + * </pre> + * + * <pre> + * Encoding of String Field is as follows: + * Length Field : indicates the length of the String + * 1 to N bytes (N=String length, determined by previous field, as described above) : The UTF-8 encoded string value. + * </pre> + * + * <pre> + * Encoding of Length Field is as follows: + * First 2 bytes: Indicate length. If both bytes = 255, this is a special value indicating that the length is + * greater than or equal to 65536 bytes; therefore, the next 4 bytes will indicate the actual length. + * </pre> + * + * <p> + * Note: All byte-order encoding is Network Byte Order (Most Significant Byte + * first) + * </p> + * + * <p> + * The following example shows the bytes expected if we were to encode a + * FlowFile containing the following attributes where the content is the text + * "Hello World!": + * + * <br><br> + * Attributes: + * <pre> + * +-------+-------+ + * | Key + Value | + * + --------------+ + * | A | a | + * + --------------+ + * | B | b | + * + --------------+ + * </pre> Content:<br> + * Hello World! + * <br><br> + * Packaged Byte Encoding (In Hexadecimal Form): + * <p> + * + * <pre> + * 00 02 00 01 41 00 01 61 + * 00 01 42 00 01 62 00 00 + * 00 00 00 00 00 0C 48 65 + * 6C 6C 6F 20 57 6F 72 6C + * 64 21 + * </pre> + */ +public class FlowFilePackagerV2 implements FlowFilePackager { + + private static final int MAX_VALUE_2_BYTES = 65535; + private final byte[] writeBuffer = new byte[8]; + + @Override + public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { + writeFieldLength(out, attributes.size()); //write out the number of attributes + for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + writeLong(out, fileSize);//write out length of data + copy(in, out);//write out the actual flow file payload + } + + private void copy(final InputStream in, final OutputStream out) throws IOException { + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } + + private void writeString(final String val, final OutputStream out) throws IOException { + final byte[] bytes = val.getBytes("UTF-8"); + writeFieldLength(out, bytes.length); + out.write(bytes); + } + + private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException { + // If the value is less than the max value that can be fit into 2 bytes, just use the + // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next + // 4 bytes to indicate the real length. + if (numBytes < MAX_VALUE_2_BYTES) { + writeBuffer[0] = (byte) (numBytes >>> 8); + writeBuffer[1] = (byte) (numBytes); + out.write(writeBuffer, 0, 2); + } else { + writeBuffer[0] = (byte) 0xff; + writeBuffer[1] = (byte) 0xff; + writeBuffer[2] = (byte) (numBytes >>> 24); + writeBuffer[3] = (byte) (numBytes >>> 16); + writeBuffer[4] = (byte) (numBytes >>> 8); + writeBuffer[5] = (byte) (numBytes); + out.write(writeBuffer, 0, 6); + } + } + + private void writeLong(final OutputStream out, final long val) throws IOException { + writeBuffer[0] = (byte) (val >>> 56); + writeBuffer[1] = (byte) (val >>> 48); + writeBuffer[2] = (byte) (val >>> 40); + writeBuffer[3] = (byte) (val >>> 32); + writeBuffer[4] = (byte) (val >>> 24); + writeBuffer[5] = (byte) (val >>> 16); + writeBuffer[6] = (byte) (val >>> 8); + writeBuffer[7] = (byte) (val); + out.write(writeBuffer, 0, 8); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java new file mode 100644 index 0000000..181f3e3 --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +public class FlowFilePackagerV3 implements FlowFilePackager { + + public static final byte[] MAGIC_HEADER = {'N', 'i', 'F', 'i', 'F', 'F', '3'}; + private static final int MAX_VALUE_2_BYTES = 65535; + private final byte[] writeBuffer = new byte[8]; + + @Override + public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { + out.write(MAGIC_HEADER); + + if (attributes == null) { + writeFieldLength(out, 0); + } else { + writeFieldLength(out, attributes.size()); //write out the number of attributes + for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + } + + writeLong(out, fileSize);//write out length of data + copy(in, out);//write out the actual flow file payload + } + + private void copy(final InputStream in, final OutputStream out) throws IOException { + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } + + private void writeString(final String val, final OutputStream out) throws IOException { + final byte[] bytes = val.getBytes("UTF-8"); + writeFieldLength(out, bytes.length); + out.write(bytes); + } + + private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException { + // If the value is less than the max value that can be fit into 2 bytes, just use the + // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next + // 4 bytes to indicate the real length. + if (numBytes < MAX_VALUE_2_BYTES) { + writeBuffer[0] = (byte) (numBytes >>> 8); + writeBuffer[1] = (byte) (numBytes); + out.write(writeBuffer, 0, 2); + } else { + writeBuffer[0] = (byte) 0xff; + writeBuffer[1] = (byte) 0xff; + writeBuffer[2] = (byte) (numBytes >>> 24); + writeBuffer[3] = (byte) (numBytes >>> 16); + writeBuffer[4] = (byte) (numBytes >>> 8); + writeBuffer[5] = (byte) (numBytes); + out.write(writeBuffer, 0, 6); + } + } + + private void writeLong(final OutputStream out, final long val) throws IOException { + writeBuffer[0] = (byte) (val >>> 56); + writeBuffer[1] = (byte) (val >>> 48); + writeBuffer[2] = (byte) (val >>> 40); + writeBuffer[3] = (byte) (val >>> 32); + writeBuffer[4] = (byte) (val >>> 24); + writeBuffer[5] = (byte) (val >>> 16); + writeBuffer[6] = (byte) (val >>> 8); + writeBuffer[7] = (byte) (val); + out.write(writeBuffer, 0, 8); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java new file mode 100644 index 0000000..fd9d92d --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +public interface FlowFileUnpackager { + + Map<String, String> unpackageFlowFile(InputStream in, OutputStream out) throws IOException; + + boolean hasMoreData() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java new file mode 100644 index 0000000..f8ef3d1 --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; + +public class FlowFileUnpackagerV1 implements FlowFileUnpackager { + + private int flowFilesRead = 0; + + @Override + public Map<String, String> unpackageFlowFile(final InputStream in, final OutputStream out) throws IOException { + flowFilesRead++; + final TarArchiveInputStream tarIn = new TarArchiveInputStream(in); + final TarArchiveEntry attribEntry = tarIn.getNextTarEntry(); + if (attribEntry == null) { + return null; + } + + final Map<String, String> attributes; + if (attribEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) { + attributes = getAttributes(tarIn); + } else { + throw new IOException("Expected two tar entries: " + + FlowFilePackagerV1.FILENAME_CONTENT + " and " + + FlowFilePackagerV1.FILENAME_ATTRIBUTES); + } + + final TarArchiveEntry contentEntry = tarIn.getNextTarEntry(); + + if (contentEntry != null && contentEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) { + final byte[] buffer = new byte[512 << 10];//512KB + int bytesRead = 0; + while ((bytesRead = tarIn.read(buffer)) != -1) { //still more data to read + if (bytesRead > 0) { + out.write(buffer, 0, bytesRead); + } + } + out.flush(); + } else { + throw new IOException("Expected two tar entries: " + + FlowFilePackagerV1.FILENAME_CONTENT + " and " + + FlowFilePackagerV1.FILENAME_ATTRIBUTES); + } + + return attributes; + } + + protected Map<String, String> getAttributes(final TarArchiveInputStream stream) throws IOException { + + final Properties props = new Properties(); + props.loadFromXML(new NonCloseableInputStream(stream)); + + final Map<String, String> result = new HashMap<>(); + for (final Entry<Object, Object> entry : props.entrySet()) { + final Object keyObject = entry.getKey(); + final Object valueObject = entry.getValue(); + if (!(keyObject instanceof String)) { + throw new IOException("Flow file attributes object contains key of type " + + keyObject.getClass().getCanonicalName() + + " but expected java.lang.String"); + } else if (!(keyObject instanceof String)) { + throw new IOException("Flow file attributes object contains value of type " + + keyObject.getClass().getCanonicalName() + + " but expected java.lang.String"); + } + + final String key = (String) keyObject; + final String value = (String) valueObject; + result.put(key, value); + } + + return result; + } + + @Override + public boolean hasMoreData() throws IOException { + return flowFilesRead == 0; + } + + public static final class NonCloseableInputStream extends InputStream { + + final InputStream stream; + + public NonCloseableInputStream(final InputStream stream) { + this.stream = stream; + } + + @Override + public void close() { + } + + @Override + public int read() throws IOException { + return stream.read(); + } + + @Override + public int available() throws IOException { + return stream.available(); + } + + @Override + public synchronized void mark(int readlimit) { + stream.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + stream.reset(); + } + + @Override + public boolean markSupported() { + return stream.markSupported(); + } + + @Override + public long skip(long n) throws IOException { + return stream.skip(n); + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + return stream.read(b, off, len); + } + + @Override + public int read(byte b[]) throws IOException { + return stream.read(b); + } + } +}