http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java ---------------------------------------------------------------------- diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java deleted file mode 100644 index afb56e8..0000000 --- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.nifi.provenance.lineage.ComputeLineageResult; -import org.apache.nifi.provenance.lineage.EdgeNode; -import org.apache.nifi.provenance.lineage.EventNode; -import org.apache.nifi.provenance.lineage.FlowFileNode; -import org.apache.nifi.provenance.lineage.LineageEdge; -import org.apache.nifi.provenance.lineage.LineageNode; - -/** - * - */ -public class StandardLineageResult implements ComputeLineageResult { - - public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); - private static final Logger logger = LoggerFactory.getLogger(StandardLineageResult.class); - - private final Collection<String> flowFileUuids; - private final Collection<ProvenanceEventRecord> relevantRecords = new ArrayList<>(); - private final Set<LineageNode> nodes = new HashSet<>(); - private final Set<LineageEdge> edges = new HashSet<>(); - private final int numSteps; - private final long creationNanos; - private long computationNanos; - - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); - - private Date expirationDate = null; - private String error = null; - private int numCompletedSteps = 0; - - private volatile boolean canceled = false; - - public StandardLineageResult(final int numSteps, final Collection<String> flowFileUuids) { - this.numSteps = numSteps; - this.creationNanos = System.nanoTime(); - this.flowFileUuids = flowFileUuids; - - updateExpiration(); - } - - @Override - public List<LineageNode> getNodes() { - readLock.lock(); - try { - return new ArrayList<>(nodes); - } finally { - readLock.unlock(); - } - } - - @Override - public List<LineageEdge> getEdges() { - readLock.lock(); - try { - return new ArrayList<>(edges); - } finally { - readLock.unlock(); - } - } - - public int getNumberOfEdges() { - readLock.lock(); - try { - return edges.size(); - } finally { - readLock.unlock(); - } - } - - public int getNumberOfNodes() { - readLock.lock(); - try { - return nodes.size(); - } finally { - readLock.unlock(); - } - } - - public long getComputationTime(final TimeUnit timeUnit) { - readLock.lock(); - try { - return timeUnit.convert(computationNanos, TimeUnit.NANOSECONDS); - } finally { - readLock.unlock(); - } - } - - @Override - public Date getExpiration() { - readLock.lock(); - try { - return expirationDate; - } finally { - readLock.unlock(); - } - } - - @Override - public String getError() { - readLock.lock(); - try { - return error; - } finally { - readLock.unlock(); - } - } - - @Override - public int getPercentComplete() { - readLock.lock(); - try { - return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F); - } finally { - readLock.unlock(); - } - } - - @Override - public boolean isFinished() { - readLock.lock(); - try { - return numCompletedSteps >= numSteps || canceled; - } finally { - readLock.unlock(); - } - } - - public void setError(final String error) { - writeLock.lock(); - try { - this.error = error; - numCompletedSteps++; - - updateExpiration(); - - if (numCompletedSteps >= numSteps) { - computationNanos = System.nanoTime() - creationNanos; - } - } finally { - writeLock.unlock(); - } - } - - public void update(final Collection<ProvenanceEventRecord> records) { - writeLock.lock(); - try { - relevantRecords.addAll(records); - - numCompletedSteps++; - updateExpiration(); - - if (numCompletedSteps >= numSteps && error == null) { - computeLineage(); - computationNanos = System.nanoTime() - creationNanos; - } - } finally { - writeLock.unlock(); - } - } - - /** - * Computes the lineage from the relevant Provenance Event Records. This - * method must be called with the write lock held and is only going to be - * useful after all of the records have been successfully obtained - */ - private void computeLineage() { - final long startNanos = System.nanoTime(); - - nodes.clear(); - edges.clear(); - - Map<String, LineageNode> lastEventMap = new HashMap<>(); // maps FlowFile UUID to last event for that FlowFile - final List<ProvenanceEventRecord> sortedRecords = new ArrayList<>(relevantRecords); - Collections.sort(sortedRecords, new Comparator<ProvenanceEventRecord>() { - @Override - public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) { - // Sort on Event Time, then Event ID. - final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime()); - if (eventTimeComparison == 0) { - return Long.compare(o1.getEventId(), o2.getEventId()); - } else { - return eventTimeComparison; - } - } - }); - - // convert the StandardProvenanceRecord objects into Lineage nodes (FlowFileNode, EventNodes). - for (final ProvenanceEventRecord record : sortedRecords) { - final LineageNode lineageNode = new EventNode(record); - final boolean added = nodes.add(lineageNode); - if (!added) { - logger.debug("Did not add {} because it already exists in the 'nodes' set", lineageNode); - } - - // Create an edge that connects this node to the previous node for the same FlowFile UUID. - final LineageNode lastNode = lastEventMap.get(record.getFlowFileUuid()); - if (lastNode != null) { - // We calculate the Edge UUID based on whether or not this event is a SPAWN. - // If this event is a SPAWN, then we want to use the previous node's UUID because a - // SPAWN Event's UUID is not necessarily what we want, since a SPAWN Event's UUID pertains to - // only one of (potentially) many UUIDs associated with the event. Otherwise, we know that - // the UUID of this record is appropriate, so we just use it. - final String edgeUuid; - - switch (record.getEventType()) { - case JOIN: - case CLONE: - case REPLAY: - edgeUuid = lastNode.getFlowFileUuid(); - break; - default: - edgeUuid = record.getFlowFileUuid(); - break; - } - - edges.add(new EdgeNode(edgeUuid, lastNode, lineageNode)); - } - - lastEventMap.put(record.getFlowFileUuid(), lineageNode); - - switch (record.getEventType()) { - case FORK: - case JOIN: - case REPLAY: - case CLONE: { - // For events that create FlowFile nodes, we need to create the FlowFile Nodes and associated Edges, as appropriate - for (final String childUuid : record.getChildUuids()) { - if (flowFileUuids.contains(childUuid)) { - final FlowFileNode childNode = new FlowFileNode(childUuid, record.getEventTime()); - final boolean isNewFlowFile = nodes.add(childNode); - if (!isNewFlowFile) { - final String msg = "Unable to generate Lineage Graph because multiple events were registered claiming to have generated the same FlowFile (UUID = " + childNode.getFlowFileUuid() + ")"; - logger.error(msg); - setError(msg); - return; - } - - edges.add(new EdgeNode(childNode.getFlowFileUuid(), lineageNode, childNode)); - lastEventMap.put(childUuid, childNode); - } - } - for (final String parentUuid : record.getParentUuids()) { - LineageNode lastNodeForParent = lastEventMap.get(parentUuid); - if (lastNodeForParent != null && !lastNodeForParent.equals(lineageNode)) { - edges.add(new EdgeNode(parentUuid, lastNodeForParent, lineageNode)); - } - - lastEventMap.put(parentUuid, lineageNode); - } - } - break; - case RECEIVE: - case CREATE: { - // for a receive event, we want to create a FlowFile Node that represents the FlowFile received - // and create an edge from the Receive Event to the FlowFile Node - final LineageNode flowFileNode = new FlowFileNode(record.getFlowFileUuid(), record.getEventTime()); - final boolean isNewFlowFile = nodes.add(flowFileNode); - if (!isNewFlowFile) { - final String msg = "Found cycle in graph. This indicates that multiple events were registered claiming to have generated the same FlowFile (UUID = " + flowFileNode.getFlowFileUuid() + ")"; - setError(msg); - logger.error(msg); - return; - } - edges.add(new EdgeNode(record.getFlowFileUuid(), lineageNode, flowFileNode)); - lastEventMap.put(record.getFlowFileUuid(), flowFileNode); - } - break; - default: - break; - } - } - - final long nanos = System.nanoTime() - startNanos; - logger.debug("Finished building lineage with {} nodes and {} edges in {} millis", nodes.size(), edges.size(), TimeUnit.NANOSECONDS.toMillis(nanos)); - } - - void cancel() { - this.canceled = true; - } - - /** - * Must be called with write lock! - */ - private void updateExpiration() { - expirationDate = new Date(System.currentTimeMillis() + TTL); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java ---------------------------------------------------------------------- diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java deleted file mode 100644 index cfbae88..0000000 --- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java +++ /dev/null @@ -1,752 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.Relationship; - -/** - * Holder for provenance relevant information - * <p/> - * @author none - */ -public final class StandardProvenanceEventRecord implements ProvenanceEventRecord { - - private final long eventTime; - private final long entryDate; - private final ProvenanceEventType eventType; - private final long lineageStartDate; - private final Set<String> lineageIdentifiers; - private final String componentId; - private final String componentType; - private final String transitUri; - private final String sourceSystemFlowFileIdentifier; - private final String uuid; - private final List<String> parentUuids; - private final List<String> childrenUuids; - private final String alternateIdentifierUri; - private final String details; - private final String relationship; - private final long storageByteOffset; - private final String storageFilename; - private final long eventDuration; - - private final String contentClaimSection; - private final String contentClaimContainer; - private final String contentClaimIdentifier; - private final Long contentClaimOffset; - private final long contentSize; - - private final String previousClaimSection; - private final String previousClaimContainer; - private final String previousClaimIdentifier; - private final Long previousClaimOffset; - private final Long previousSize; - - private final String sourceQueueIdentifier; - - private final Map<String, String> previousAttributes; - private final Map<String, String> updatedAttributes; - - private volatile long eventId; - - private StandardProvenanceEventRecord(final Builder builder) { - this.eventTime = builder.eventTime; - this.entryDate = builder.entryDate; - this.eventType = builder.eventType; - this.componentId = builder.componentId; - this.componentType = builder.componentType; - this.transitUri = builder.transitUri; - this.sourceSystemFlowFileIdentifier = builder.sourceSystemFlowFileIdentifier; - this.uuid = builder.uuid; - this.parentUuids = builder.parentUuids; - this.childrenUuids = builder.childrenUuids; - this.alternateIdentifierUri = builder.alternateIdentifierUri; - this.details = builder.details; - this.relationship = builder.relationship; - this.storageByteOffset = builder.storageByteOffset; - this.storageFilename = builder.storageFilename; - this.eventDuration = builder.eventDuration; - this.lineageStartDate = builder.lineageStartDate; - this.lineageIdentifiers = Collections.unmodifiableSet(builder.lineageIdentifiers); - - previousClaimSection = builder.previousClaimSection; - previousClaimContainer = builder.previousClaimContainer; - previousClaimIdentifier = builder.previousClaimIdentifier; - previousClaimOffset = builder.previousClaimOffset; - previousSize = builder.previousSize; - - contentClaimSection = builder.contentClaimSection; - contentClaimContainer = builder.contentClaimContainer; - contentClaimIdentifier = builder.contentClaimIdentifier; - contentClaimOffset = builder.contentClaimOffset; - contentSize = builder.contentSize; - - previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes); - updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes); - - sourceQueueIdentifier = builder.sourceQueueIdentifier; - - } - - public String getStorageFilename() { - return storageFilename; - } - - public long getStorageByteOffset() { - return storageByteOffset; - } - - void setEventId(final long eventId) { - this.eventId = eventId; - } - - @Override - public long getEventId() { - return eventId; - } - - @Override - public long getEventTime() { - return eventTime; - } - - @Override - public Set<String> getLineageIdentifiers() { - return lineageIdentifiers; - } - - @Override - public long getLineageStartDate() { - return lineageStartDate; - } - - @Override - public long getFileSize() { - return contentSize; - } - - @Override - public Long getPreviousFileSize() { - return previousSize; - } - - @Override - public ProvenanceEventType getEventType() { - return eventType; - } - - @Override - public Map<String, String> getAttributes() { - final Map<String, String> allAttrs = new HashMap<>(previousAttributes.size() + updatedAttributes.size()); - allAttrs.putAll(previousAttributes); - for (final Map.Entry<String, String> entry : updatedAttributes.entrySet()) { - if (entry.getValue() != null) { - allAttrs.put(entry.getKey(), entry.getValue()); - } - } - return allAttrs; - } - - @Override - public String getComponentId() { - return componentId; - } - - @Override - public String getComponentType() { - return componentType; - } - - @Override - public String getTransitUri() { - return transitUri; - } - - @Override - public String getSourceSystemFlowFileIdentifier() { - return sourceSystemFlowFileIdentifier; - } - - @Override - public String getFlowFileUuid() { - return uuid; - } - - @Override - public List<String> getParentUuids() { - return parentUuids == null ? Collections.<String>emptyList() : parentUuids; - } - - @Override - public List<String> getChildUuids() { - return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids; - } - - @Override - public String getAlternateIdentifierUri() { - return alternateIdentifierUri; - } - - @Override - public long getEventDuration() { - return eventDuration; - } - - @Override - public String getDetails() { - return details; - } - - @Override - public String getRelationship() { - return relationship; - } - - @Override - public long getFlowFileEntryDate() { - return entryDate; - } - - @Override - public String getContentClaimSection() { - return contentClaimSection; - } - - @Override - public String getContentClaimContainer() { - return contentClaimContainer; - } - - @Override - public String getContentClaimIdentifier() { - return contentClaimIdentifier; - } - - @Override - public Long getContentClaimOffset() { - return contentClaimOffset; - } - - @Override - public String getSourceQueueIdentifier() { - return sourceQueueIdentifier; - } - - @Override - public Map<String, String> getPreviousAttributes() { - return previousAttributes; - } - - @Override - public String getPreviousContentClaimContainer() { - return previousClaimContainer; - } - - @Override - public String getPreviousContentClaimIdentifier() { - return previousClaimIdentifier; - } - - @Override - public Long getPreviousContentClaimOffset() { - return previousClaimOffset; - } - - @Override - public String getPreviousContentClaimSection() { - return previousClaimSection; - } - - @Override - public Map<String, String> getUpdatedAttributes() { - return updatedAttributes; - } - - @Override - public int hashCode() { - final int eventTypeCode; - if (eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.FORK) { - eventTypeCode = 1472; - } else if (eventType == ProvenanceEventType.REPLAY) { - eventTypeCode = 21479 + (int) (0x7FFFFFFF & eventTime); // use lower bits of event time. - } else { - eventTypeCode = 4812 + eventType.hashCode() + 4 * uuid.hashCode(); - } - - return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode()) - + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode; - } - - @Override - public boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { - return true; - } - if (!(obj instanceof StandardProvenanceEventRecord)) { - return false; - } - - final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj; - // If event ID's are populated and not equal, return false. If they have not yet been populated, do not - // use them in the comparison. - if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) { - return false; - } - if (eventType != other.eventType) { - return false; - } - - if (!componentId.equals(other.componentId)) { - return false; - } - - if (different(parentUuids, other.parentUuids)) { - return false; - } - - if (different(childrenUuids, other.childrenUuids)) { - return false; - } - - // SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child. - if (!uuid.equals(other.uuid)) { - return false; - } - - if (different(transitUri, other.transitUri)) { - return false; - } - - if (different(relationship, other.relationship)) { - return false; - } - - return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime()); - } - - private boolean different(final Object a, final Object b) { - if (a == null && b == null) { - return false; - } - if (a == null || b == null) { - return true; - } - - return !a.equals(b); - } - - private boolean different(final List<String> a, final List<String> b) { - if (a == null && b == null) { - return false; - } - - if (a == null && b != null) { - return true; - } - - if (a != null && b == null) { - return true; - } - - if (a.size() != b.size()) { - return true; - } - - final List<String> sortedA = new ArrayList<>(a); - final List<String> sortedB = new ArrayList<>(b); - - Collections.sort(sortedA); - Collections.sort(sortedB); - - for (int i = 0; i < sortedA.size(); i++) { - if (!sortedA.get(i).equals(sortedB.get(i))) { - return true; - } - } - - return false; - } - - @Override - public String toString() { - return "ProvenanceEventRecord [" - + "eventId=" + eventId - + ", eventType=" + eventType - + ", eventTime=" + new Date(eventTime) - + ", uuid=" + uuid - + ", fileSize=" + contentSize - + ", componentId=" + componentId - + ", transitUri=" + transitUri - + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier - + ", parentUuids=" + parentUuids - + ", alternateIdentifierUri=" + alternateIdentifierUri + "]"; - } - - public static class Builder implements ProvenanceEventBuilder { - - private long eventTime = System.currentTimeMillis(); - private long entryDate; - private long lineageStartDate; - private Set<String> lineageIdentifiers = new HashSet<>(); - private ProvenanceEventType eventType = null; - private String componentId = null; - private String componentType = null; - private String sourceSystemFlowFileIdentifier = null; - private String transitUri = null; - private String uuid = null; - private List<String> parentUuids = null; - private List<String> childrenUuids = null; - private String contentType = null; - private String alternateIdentifierUri = null; - private String details = null; - private String relationship = null; - private long storageByteOffset = -1L; - private long eventDuration = -1L; - private String storageFilename; - - private String contentClaimSection; - private String contentClaimContainer; - private String contentClaimIdentifier; - private Long contentClaimOffset; - private Long contentSize; - - private String previousClaimSection; - private String previousClaimContainer; - private String previousClaimIdentifier; - private Long previousClaimOffset; - private Long previousSize; - - private String sourceQueueIdentifier; - - private Map<String, String> previousAttributes; - private Map<String, String> updatedAttributes; - - @Override - public Builder fromEvent(final ProvenanceEventRecord event) { - eventTime = event.getEventTime(); - entryDate = event.getFlowFileEntryDate(); - lineageStartDate = event.getLineageStartDate(); - lineageIdentifiers = event.getLineageIdentifiers(); - eventType = event.getEventType(); - componentId = event.getComponentId(); - componentType = event.getComponentType(); - transitUri = event.getTransitUri(); - sourceSystemFlowFileIdentifier = event.getSourceSystemFlowFileIdentifier(); - uuid = event.getFlowFileUuid(); - parentUuids = event.getParentUuids(); - childrenUuids = event.getChildUuids(); - alternateIdentifierUri = event.getAlternateIdentifierUri(); - eventDuration = event.getEventDuration(); - previousAttributes = event.getPreviousAttributes(); - updatedAttributes = event.getUpdatedAttributes(); - details = event.getDetails(); - relationship = event.getRelationship(); - - contentClaimSection = event.getContentClaimSection(); - contentClaimContainer = event.getContentClaimContainer(); - contentClaimIdentifier = event.getContentClaimIdentifier(); - contentClaimOffset = event.getContentClaimOffset(); - contentSize = event.getFileSize(); - - previousClaimSection = event.getPreviousContentClaimSection(); - previousClaimContainer = event.getPreviousContentClaimContainer(); - previousClaimIdentifier = event.getPreviousContentClaimIdentifier(); - previousClaimOffset = event.getPreviousContentClaimOffset(); - previousSize = event.getPreviousFileSize(); - - sourceQueueIdentifier = event.getSourceQueueIdentifier(); - - if (event instanceof StandardProvenanceEventRecord) { - final StandardProvenanceEventRecord standardProvEvent = (StandardProvenanceEventRecord) event; - storageByteOffset = standardProvEvent.storageByteOffset; - storageFilename = standardProvEvent.storageFilename; - } - - return this; - } - - @Override - public Builder setFlowFileEntryDate(final long entryDate) { - this.entryDate = entryDate; - return this; - } - - @Override - public Builder setLineageIdentifiers(final Set<String> lineageIdentifiers) { - this.lineageIdentifiers = lineageIdentifiers; - return this; - } - - @Override - public Builder setAttributes(final Map<String, String> previousAttributes, final Map<String, String> updatedAttributes) { - this.previousAttributes = previousAttributes; - this.updatedAttributes = updatedAttributes; - return this; - } - - @Override - public Builder setFlowFileUUID(final String uuid) { - this.uuid = uuid; - return this; - } - - public Builder setStorageLocation(final String filename, final long offset) { - this.storageFilename = filename; - this.storageByteOffset = offset; - return this; - } - - @Override - public Builder setEventTime(long eventTime) { - this.eventTime = eventTime; - return this; - } - - @Override - public Builder setEventDuration(final long millis) { - this.eventDuration = millis; - return this; - } - - @Override - public Builder setLineageStartDate(final long startDate) { - this.lineageStartDate = startDate; - return this; - } - - public Builder addLineageIdentifier(final String lineageIdentifier) { - this.lineageIdentifiers.add(lineageIdentifier); - return this; - } - - @Override - public Builder setEventType(ProvenanceEventType eventType) { - this.eventType = eventType; - return this; - } - - @Override - public Builder setComponentId(String componentId) { - this.componentId = componentId; - return this; - } - - @Override - public Builder setComponentType(String componentType) { - this.componentType = componentType; - return this; - } - - @Override - public Builder setSourceSystemFlowFileIdentifier(String sourceSystemFlowFileIdentifier) { - this.sourceSystemFlowFileIdentifier = sourceSystemFlowFileIdentifier; - return this; - } - - @Override - public Builder setTransitUri(String transitUri) { - this.transitUri = transitUri; - return this; - } - - @Override - public Builder addParentFlowFile(final FlowFile parentFlowFile) { - if (this.parentUuids == null) { - this.parentUuids = new ArrayList<>(); - } - this.parentUuids.add(parentFlowFile.getAttribute(CoreAttributes.UUID.key())); - return this; - } - - public Builder addParentUuid(final String uuid) { - if (this.parentUuids == null) { - this.parentUuids = new ArrayList<>(); - } - this.parentUuids.add(uuid); - return this; - } - - @Override - public Builder removeParentFlowFile(final FlowFile parentFlowFile) { - if (this.parentUuids == null) { - return this; - } - - parentUuids.remove(parentFlowFile.getAttribute(CoreAttributes.UUID.key())); - return this; - } - - @Override - public Builder addChildFlowFile(final FlowFile childFlowFile) { - if (this.childrenUuids == null) { - this.childrenUuids = new ArrayList<>(); - } - this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key())); - return this; - } - - public Builder addChildUuid(final String uuid) { - if (this.childrenUuids == null) { - this.childrenUuids = new ArrayList<>(); - } - this.childrenUuids.add(uuid); - return this; - } - - @Override - public Builder removeChildFlowFile(final FlowFile childFlowFile) { - if (this.childrenUuids == null) { - return this; - } - - childrenUuids.remove(childFlowFile.getAttribute(CoreAttributes.UUID.key())); - return this; - } - - public Builder setContentType(String contentType) { - this.contentType = contentType; - return this; - } - - @Override - public Builder setAlternateIdentifierUri(String alternateIdentifierUri) { - this.alternateIdentifierUri = alternateIdentifierUri; - return this; - } - - @Override - public Builder setDetails(String details) { - this.details = details; - return this; - } - - @Override - public Builder setRelationship(Relationship relationship) { - this.relationship = relationship.getName(); - return this; - } - - public Builder setRelationship(final String relationship) { - this.relationship = relationship; - return this; - } - - @Override - public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) { - setFlowFileEntryDate(flowFile.getEntryDate()); - setLineageIdentifiers(flowFile.getLineageIdentifiers()); - setLineageStartDate(flowFile.getLineageStartDate()); - setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes()); - uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); - this.contentSize = flowFile.getSize(); - return this; - } - - @Override - public Builder setPreviousContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) { - previousClaimSection = section; - previousClaimContainer = container; - previousClaimIdentifier = identifier; - previousClaimOffset = offset; - previousSize = size; - return this; - } - - @Override - public Builder setCurrentContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) { - contentClaimSection = section; - contentClaimContainer = container; - contentClaimIdentifier = identifier; - contentClaimOffset = offset; - contentSize = size; - return this; - } - - @Override - public Builder setSourceQueueIdentifier(final String identifier) { - sourceQueueIdentifier = identifier; - return this; - } - - private void assertSet(final Object value, final String name) { - if (value == null) { - throw new IllegalStateException("Cannot create Provenance Event Record because " + name + " is not set"); - } - } - - public ProvenanceEventType getEventType() { - return eventType; - } - - public List<String> getChildUuids() { - return Collections.unmodifiableList(childrenUuids); - } - - public List<String> getParentUuids() { - return Collections.unmodifiableList(parentUuids); - } - - @Override - public StandardProvenanceEventRecord build() { - assertSet(eventType, "Event Type"); - assertSet(componentId, "Component ID"); - assertSet(componentType, "Component Type"); - assertSet(uuid, "FlowFile UUID"); - assertSet(contentSize, "FlowFile Size"); - - switch (eventType) { - case ADDINFO: - if (alternateIdentifierUri == null) { - throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no alternate identifiers have been set"); - } - break; - case RECEIVE: - case SEND: - assertSet(transitUri, "Transit URI"); - break; - case ROUTE: - assertSet(relationship, "Relationship"); - break; - case CLONE: - case FORK: - case JOIN: - if ((parentUuids == null || parentUuids.isEmpty()) && (childrenUuids == null || childrenUuids.isEmpty())) { - throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no Parent UUIDs or Children UUIDs have been set"); - } - break; - default: - break; - } - - return new StandardProvenanceEventRecord(this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java ---------------------------------------------------------------------- diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java deleted file mode 100644 index 9a9a27d..0000000 --- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.nifi.provenance.search.Query; -import org.apache.nifi.provenance.search.QueryResult; - -public class StandardQueryResult implements QueryResult { - - public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); - private final Query query; - private final long creationNanos; - - private final int numSteps; - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - - private final Lock writeLock = rwLock.writeLock(); - // guarded by writeLock - private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>(); - private long totalHitCount; - private int numCompletedSteps = 0; - private Date expirationDate; - private String error; - private long queryTime; - - private volatile boolean canceled = false; - - public StandardQueryResult(final Query query, final int numSteps) { - this.query = query; - this.numSteps = numSteps; - this.creationNanos = System.nanoTime(); - - updateExpiration(); - } - - @Override - public List<ProvenanceEventRecord> getMatchingEvents() { - readLock.lock(); - try { - if (matchingRecords.size() <= query.getMaxResults()) { - return new ArrayList<>(matchingRecords); - } - - final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults()); - for (int i = 0; i < query.getMaxResults(); i++) { - copy.add(matchingRecords.get(i)); - } - - return copy; - } finally { - readLock.unlock(); - } - } - - @Override - public long getTotalHitCount() { - readLock.lock(); - try { - return totalHitCount; - } finally { - readLock.unlock(); - } - } - - @Override - public long getQueryTime() { - return queryTime; - } - - @Override - public Date getExpiration() { - return expirationDate; - } - - @Override - public String getError() { - return error; - } - - @Override - public int getPercentComplete() { - readLock.lock(); - try { - return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F); - } finally { - readLock.unlock(); - } - } - - @Override - public boolean isFinished() { - readLock.lock(); - try { - return numCompletedSteps >= numSteps || canceled; - } finally { - readLock.unlock(); - } - } - - void cancel() { - this.canceled = true; - } - - public void setError(final String error) { - writeLock.lock(); - try { - this.error = error; - numCompletedSteps++; - - updateExpiration(); - if (numCompletedSteps >= numSteps) { - final long searchNanos = System.nanoTime() - creationNanos; - queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS); - } - } finally { - writeLock.unlock(); - } - } - - public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) { - writeLock.lock(); - try { - this.matchingRecords.addAll(matchingRecords); - this.totalHitCount += totalHits; - - numCompletedSteps++; - updateExpiration(); - - if (numCompletedSteps >= numSteps) { - final long searchNanos = System.nanoTime() - creationNanos; - queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS); - } - } finally { - writeLock.unlock(); - } - } - - /** - * Must be called with write lock! - */ - private void updateExpiration() { - expirationDate = new Date(System.currentTimeMillis() + TTL); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java ---------------------------------------------------------------------- diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java deleted file mode 100644 index 0aaf5ef..0000000 --- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance.lineage; - -import static java.util.Objects.requireNonNull; - -public class EdgeNode implements LineageEdge { - - private final String uuid; - private final LineageNode source; - private final LineageNode destination; - - public EdgeNode(final String uuid, final LineageNode source, final LineageNode destination) { - this.uuid = uuid; - this.source = requireNonNull(source); - this.destination = requireNonNull(destination); - } - - @Override - public String getUuid() { - return uuid; - } - - @Override - public LineageNode getSource() { - return source; - } - - @Override - public LineageNode getDestination() { - return destination; - } - - @Override - public int hashCode() { - return 43298293 + source.hashCode() + destination.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - - if (!(obj instanceof EdgeNode)) { - return false; - } - - final EdgeNode other = (EdgeNode) obj; - return (source.equals(other.source) && destination.equals(other.destination)); - } - - @Override - public String toString() { - return "Edge[Source=" + source + ", Destination=" + destination + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java ---------------------------------------------------------------------- diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java deleted file mode 100644 index 12d9a4f..0000000 --- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance.lineage; - -import java.util.List; - -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.ProvenanceEventType; - -public class EventNode implements ProvenanceEventLineageNode { - - private final ProvenanceEventRecord record; - private String clusterNodeIdentifier = null; - - public EventNode(final ProvenanceEventRecord event) { - this.record = event; - } - - @Override - public String getIdentifier() { - return String.valueOf(getEventIdentifier()); - } - - @Override - public String getClusterNodeIdentifier() { - return clusterNodeIdentifier; - } - - public void setClusterNodeIdentifier(final String nodeIdentifier) { - this.clusterNodeIdentifier = nodeIdentifier; - } - - @Override - public LineageNodeType getNodeType() { - return LineageNodeType.PROVENANCE_EVENT_NODE; - } - - @Override - public ProvenanceEventType getEventType() { - return record.getEventType(); - } - - @Override - public long getTimestamp() { - return record.getEventTime(); - } - - @Override - public long getEventIdentifier() { - return record.getEventId(); - } - - @Override - public String getFlowFileUuid() { - return record.getAttributes().get(CoreAttributes.UUID.key()); - } - - @Override - public List<String> getParentUuids() { - return record.getParentUuids(); - } - - @Override - public List<String> getChildUuids() { - return record.getChildUuids(); - } - - @Override - public int hashCode() { - return 2938472 + record.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (this == obj) { - return true; - } - - if (!(obj instanceof EventNode)) { - return false; - } - - final EventNode other = (EventNode) obj; - return record.equals(other.record); - } - - @Override - public String toString() { - return "Event[ID=" + record.getEventId() + ", Type=" + record.getEventType() + ", UUID=" + record.getFlowFileUuid() + ", Component=" + record.getComponentId() + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java ---------------------------------------------------------------------- diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java deleted file mode 100644 index c36c38d..0000000 --- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance.lineage; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static java.util.Objects.requireNonNull; - -public class FlowFileLineage implements Lineage { - - private final List<LineageNode> nodes; - private final List<LineageEdge> edges; - - public FlowFileLineage(final Collection<LineageNode> nodes, final Collection<LineageEdge> edges) { - this.nodes = new ArrayList<>(requireNonNull(nodes)); - this.edges = new ArrayList<>(requireNonNull(edges)); - } - - @Override - public List<LineageNode> getNodes() { - return nodes; - } - - @Override - public List<LineageEdge> getEdges() { - return edges; - } - - @Override - public int hashCode() { - int sum = 923; - for (final LineageNode node : nodes) { - sum += node.hashCode(); - } - - for (final LineageEdge edge : edges) { - sum += edge.hashCode(); - } - - return sum; - } - - @Override - public boolean equals(final Object obj) { - if (obj == null) { - return false; - } - - if (obj == this) { - return true; - } - - if (!(obj instanceof FlowFileLineage)) { - return false; - } - - final FlowFileLineage other = (FlowFileLineage) obj; - return nodes.equals(other.nodes) && edges.equals(other.edges); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java ---------------------------------------------------------------------- diff --git a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java b/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java deleted file mode 100644 index fdc7470..0000000 --- a/nifi/commons/data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance.lineage; - -import static java.util.Objects.requireNonNull; - -public class FlowFileNode implements LineageNode { - - private final String flowFileUuid; - private final long creationTime; - private String clusterNodeIdentifier; - - public FlowFileNode(final String flowFileUuid, final long flowFileCreationTime) { - this.flowFileUuid = requireNonNull(flowFileUuid); - this.creationTime = flowFileCreationTime; - } - - @Override - public String getIdentifier() { - return flowFileUuid; - } - - @Override - public long getTimestamp() { - return creationTime; - } - - @Override - public String getClusterNodeIdentifier() { - return clusterNodeIdentifier; - } - - @Override - public LineageNodeType getNodeType() { - return LineageNodeType.FLOWFILE_NODE; - } - - @Override - public String getFlowFileUuid() { - return flowFileUuid; - } - - @Override - public int hashCode() { - return 23498723 + flowFileUuid.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { - return true; - } - - if (!(obj instanceof FlowFileNode)) { - return false; - } - - final FlowFileNode other = (FlowFileNode) obj; - return flowFileUuid.equals(other.flowFileUuid); - } - - @Override - public String toString() { - return "FlowFile[UUID=" + flowFileUuid + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/commons/flowfile-packager/pom.xml b/nifi/commons/flowfile-packager/pom.xml deleted file mode 100644 index 6e8d58d..0000000 --- a/nifi/commons/flowfile-packager/pom.xml +++ /dev/null @@ -1,41 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-commons-parent</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> - </parent> - - <artifactId>flowfile-packager</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> - <packaging>jar</packaging> - - <name>FlowFile Packager</name> - - <dependencies> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java ---------------------------------------------------------------------- diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java deleted file mode 100644 index ae16f99..0000000 --- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackager.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; - -public interface FlowFilePackager { - - void packageFlowFile(InputStream in, OutputStream out, Map<String, String> attributes, long fileSize) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java ---------------------------------------------------------------------- diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java deleted file mode 100644 index 07baab1..0000000 --- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV1.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.Map; - -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.apache.commons.lang3.StringEscapeUtils; - -public class FlowFilePackagerV1 implements FlowFilePackager { - - public static final String FILENAME_ATTRIBUTES = "flowfile.attributes"; - public static final String FILENAME_CONTENT = "flowfile.content"; - public static final int DEFAULT_TAR_PERMISSIONS = 0644; - - private final int tarPermissions; - - public FlowFilePackagerV1() { - this(DEFAULT_TAR_PERMISSIONS); - } - - public FlowFilePackagerV1(final int tarPermissions) { - this.tarPermissions = tarPermissions; - } - - @Override - public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { - try (final TarArchiveOutputStream tout = new TarArchiveOutputStream(out)) { - writeAttributesEntry(attributes, tout); - writeContentEntry(tout, in, fileSize); - tout.finish(); - tout.flush(); - tout.close(); - } - } - - private void writeAttributesEntry(final Map<String, String> attributes, final TarArchiveOutputStream tout) throws IOException { - final StringBuilder sb = new StringBuilder(); - sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?><!DOCTYPE properties\n SYSTEM \"http://java.sun.com/dtd/properties.dtd\">\n"); - sb.append("<properties>"); - for (final Map.Entry<String, String> entry : attributes.entrySet()) { - final String escapedKey = StringEscapeUtils.escapeXml11(entry.getKey()); - final String escapedValue = StringEscapeUtils.escapeXml11(entry.getValue()); - sb.append("\n <entry key=\"").append(escapedKey).append("\">").append(escapedValue).append("</entry>"); - } - sb.append("</properties>"); - - final byte[] metaBytes = sb.toString().getBytes(StandardCharsets.UTF_8); - final TarArchiveEntry attribEntry = new TarArchiveEntry(FILENAME_ATTRIBUTES); - attribEntry.setMode(tarPermissions); - attribEntry.setSize(metaBytes.length); - tout.putArchiveEntry(attribEntry); - tout.write(metaBytes); - tout.closeArchiveEntry(); - } - - private void writeContentEntry(final TarArchiveOutputStream tarOut, final InputStream inStream, final long fileSize) throws IOException { - final TarArchiveEntry entry = new TarArchiveEntry(FILENAME_CONTENT); - entry.setMode(tarPermissions); - entry.setSize(fileSize); - tarOut.putArchiveEntry(entry); - final byte[] buffer = new byte[512 << 10];//512KB - int bytesRead = 0; - while ((bytesRead = inStream.read(buffer)) != -1) { //still more data to read - if (bytesRead > 0) { - tarOut.write(buffer, 0, bytesRead); - } - } - - copy(inStream, tarOut); - tarOut.closeArchiveEntry(); - } - - public static long copy(final InputStream source, final OutputStream destination) throws IOException { - final byte[] buffer = new byte[8192]; - int len; - long totalCount = 0L; - while ((len = source.read(buffer)) > 0) { - destination.write(buffer, 0, len); - totalCount += len; - } - return totalCount; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java ---------------------------------------------------------------------- diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java deleted file mode 100644 index 6f9d6b1..0000000 --- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV2.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; - -/** - * <p> - * Packages a FlowFile, including both its content and its attributes into a - * single file that is stream-friendly. The encoding scheme is as such: - * </p> - * - * <pre> - * Length Field : indicates the number of Flow File Attributes in the stream - * 1 to N times (N=number of Flow File Attributes): - * String Field : Flow File Attribute key name - * String Field : Flow File Attribute value - * Long : 8 bytes indicating the length of the Flow File content - * Content : The next M bytes are the content of the Flow File. - * </pre> - * - * <pre> - * Encoding of String Field is as follows: - * Length Field : indicates the length of the String - * 1 to N bytes (N=String length, determined by previous field, as described above) : The UTF-8 encoded string value. - * </pre> - * - * <pre> - * Encoding of Length Field is as follows: - * First 2 bytes: Indicate length. If both bytes = 255, this is a special value indicating that the length is - * greater than or equal to 65536 bytes; therefore, the next 4 bytes will indicate the actual length. - * </pre> - * - * <p> - * Note: All byte-order encoding is Network Byte Order (Most Significant Byte - * first) - * </p> - * - * <p> - * The following example shows the bytes expected if we were to encode a - * FlowFile containing the following attributes where the content is the text - * "Hello World!": - * - * <br><br> - * Attributes: - * <pre> - * +-------+-------+ - * | Key + Value | - * + --------------+ - * | A | a | - * + --------------+ - * | B | b | - * + --------------+ - * </pre> Content:<br> - * Hello World! - * <br><br> - * Packaged Byte Encoding (In Hexadecimal Form): - * <p> - * - * <pre> - * 00 02 00 01 41 00 01 61 - * 00 01 42 00 01 62 00 00 - * 00 00 00 00 00 0C 48 65 - * 6C 6C 6F 20 57 6F 72 6C - * 64 21 - * </pre> - */ -public class FlowFilePackagerV2 implements FlowFilePackager { - - private static final int MAX_VALUE_2_BYTES = 65535; - private final byte[] writeBuffer = new byte[8]; - - @Override - public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { - writeFieldLength(out, attributes.size()); //write out the number of attributes - for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair - writeString(entry.getKey(), out); - writeString(entry.getValue(), out); - } - writeLong(out, fileSize);//write out length of data - copy(in, out);//write out the actual flow file payload - } - - private void copy(final InputStream in, final OutputStream out) throws IOException { - final byte[] buffer = new byte[65536]; - int len; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - } - - private void writeString(final String val, final OutputStream out) throws IOException { - final byte[] bytes = val.getBytes("UTF-8"); - writeFieldLength(out, bytes.length); - out.write(bytes); - } - - private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException { - // If the value is less than the max value that can be fit into 2 bytes, just use the - // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next - // 4 bytes to indicate the real length. - if (numBytes < MAX_VALUE_2_BYTES) { - writeBuffer[0] = (byte) (numBytes >>> 8); - writeBuffer[1] = (byte) (numBytes); - out.write(writeBuffer, 0, 2); - } else { - writeBuffer[0] = (byte) 0xff; - writeBuffer[1] = (byte) 0xff; - writeBuffer[2] = (byte) (numBytes >>> 24); - writeBuffer[3] = (byte) (numBytes >>> 16); - writeBuffer[4] = (byte) (numBytes >>> 8); - writeBuffer[5] = (byte) (numBytes); - out.write(writeBuffer, 0, 6); - } - } - - private void writeLong(final OutputStream out, final long val) throws IOException { - writeBuffer[0] = (byte) (val >>> 56); - writeBuffer[1] = (byte) (val >>> 48); - writeBuffer[2] = (byte) (val >>> 40); - writeBuffer[3] = (byte) (val >>> 32); - writeBuffer[4] = (byte) (val >>> 24); - writeBuffer[5] = (byte) (val >>> 16); - writeBuffer[6] = (byte) (val >>> 8); - writeBuffer[7] = (byte) (val); - out.write(writeBuffer, 0, 8); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java ---------------------------------------------------------------------- diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java deleted file mode 100644 index 181f3e3..0000000 --- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFilePackagerV3.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; - -public class FlowFilePackagerV3 implements FlowFilePackager { - - public static final byte[] MAGIC_HEADER = {'N', 'i', 'F', 'i', 'F', 'F', '3'}; - private static final int MAX_VALUE_2_BYTES = 65535; - private final byte[] writeBuffer = new byte[8]; - - @Override - public void packageFlowFile(final InputStream in, final OutputStream out, final Map<String, String> attributes, final long fileSize) throws IOException { - out.write(MAGIC_HEADER); - - if (attributes == null) { - writeFieldLength(out, 0); - } else { - writeFieldLength(out, attributes.size()); //write out the number of attributes - for (final Map.Entry<String, String> entry : attributes.entrySet()) { //write out each attribute key/value pair - writeString(entry.getKey(), out); - writeString(entry.getValue(), out); - } - } - - writeLong(out, fileSize);//write out length of data - copy(in, out);//write out the actual flow file payload - } - - private void copy(final InputStream in, final OutputStream out) throws IOException { - final byte[] buffer = new byte[65536]; - int len; - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - } - - private void writeString(final String val, final OutputStream out) throws IOException { - final byte[] bytes = val.getBytes("UTF-8"); - writeFieldLength(out, bytes.length); - out.write(bytes); - } - - private void writeFieldLength(final OutputStream out, final int numBytes) throws IOException { - // If the value is less than the max value that can be fit into 2 bytes, just use the - // actual value. Otherwise, we will set the first 2 bytes to 255/255 and then use the next - // 4 bytes to indicate the real length. - if (numBytes < MAX_VALUE_2_BYTES) { - writeBuffer[0] = (byte) (numBytes >>> 8); - writeBuffer[1] = (byte) (numBytes); - out.write(writeBuffer, 0, 2); - } else { - writeBuffer[0] = (byte) 0xff; - writeBuffer[1] = (byte) 0xff; - writeBuffer[2] = (byte) (numBytes >>> 24); - writeBuffer[3] = (byte) (numBytes >>> 16); - writeBuffer[4] = (byte) (numBytes >>> 8); - writeBuffer[5] = (byte) (numBytes); - out.write(writeBuffer, 0, 6); - } - } - - private void writeLong(final OutputStream out, final long val) throws IOException { - writeBuffer[0] = (byte) (val >>> 56); - writeBuffer[1] = (byte) (val >>> 48); - writeBuffer[2] = (byte) (val >>> 40); - writeBuffer[3] = (byte) (val >>> 32); - writeBuffer[4] = (byte) (val >>> 24); - writeBuffer[5] = (byte) (val >>> 16); - writeBuffer[6] = (byte) (val >>> 8); - writeBuffer[7] = (byte) (val); - out.write(writeBuffer, 0, 8); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java ---------------------------------------------------------------------- diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java deleted file mode 100644 index fd9d92d..0000000 --- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackager.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; - -public interface FlowFileUnpackager { - - Map<String, String> unpackageFlowFile(InputStream in, OutputStream out) throws IOException; - - boolean hasMoreData() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java ---------------------------------------------------------------------- diff --git a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java b/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java deleted file mode 100644 index f8ef3d1..0000000 --- a/nifi/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV1.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; - -public class FlowFileUnpackagerV1 implements FlowFileUnpackager { - - private int flowFilesRead = 0; - - @Override - public Map<String, String> unpackageFlowFile(final InputStream in, final OutputStream out) throws IOException { - flowFilesRead++; - final TarArchiveInputStream tarIn = new TarArchiveInputStream(in); - final TarArchiveEntry attribEntry = tarIn.getNextTarEntry(); - if (attribEntry == null) { - return null; - } - - final Map<String, String> attributes; - if (attribEntry.getName().equals(FlowFilePackagerV1.FILENAME_ATTRIBUTES)) { - attributes = getAttributes(tarIn); - } else { - throw new IOException("Expected two tar entries: " - + FlowFilePackagerV1.FILENAME_CONTENT + " and " - + FlowFilePackagerV1.FILENAME_ATTRIBUTES); - } - - final TarArchiveEntry contentEntry = tarIn.getNextTarEntry(); - - if (contentEntry != null && contentEntry.getName().equals(FlowFilePackagerV1.FILENAME_CONTENT)) { - final byte[] buffer = new byte[512 << 10];//512KB - int bytesRead = 0; - while ((bytesRead = tarIn.read(buffer)) != -1) { //still more data to read - if (bytesRead > 0) { - out.write(buffer, 0, bytesRead); - } - } - out.flush(); - } else { - throw new IOException("Expected two tar entries: " - + FlowFilePackagerV1.FILENAME_CONTENT + " and " - + FlowFilePackagerV1.FILENAME_ATTRIBUTES); - } - - return attributes; - } - - protected Map<String, String> getAttributes(final TarArchiveInputStream stream) throws IOException { - - final Properties props = new Properties(); - props.loadFromXML(new NonCloseableInputStream(stream)); - - final Map<String, String> result = new HashMap<>(); - for (final Entry<Object, Object> entry : props.entrySet()) { - final Object keyObject = entry.getKey(); - final Object valueObject = entry.getValue(); - if (!(keyObject instanceof String)) { - throw new IOException("Flow file attributes object contains key of type " - + keyObject.getClass().getCanonicalName() - + " but expected java.lang.String"); - } else if (!(keyObject instanceof String)) { - throw new IOException("Flow file attributes object contains value of type " - + keyObject.getClass().getCanonicalName() - + " but expected java.lang.String"); - } - - final String key = (String) keyObject; - final String value = (String) valueObject; - result.put(key, value); - } - - return result; - } - - @Override - public boolean hasMoreData() throws IOException { - return flowFilesRead == 0; - } - - public static final class NonCloseableInputStream extends InputStream { - - final InputStream stream; - - public NonCloseableInputStream(final InputStream stream) { - this.stream = stream; - } - - @Override - public void close() { - } - - @Override - public int read() throws IOException { - return stream.read(); - } - - @Override - public int available() throws IOException { - return stream.available(); - } - - @Override - public synchronized void mark(int readlimit) { - stream.mark(readlimit); - } - - @Override - public synchronized void reset() throws IOException { - stream.reset(); - } - - @Override - public boolean markSupported() { - return stream.markSupported(); - } - - @Override - public long skip(long n) throws IOException { - return stream.skip(n); - } - - @Override - public int read(byte b[], int off, int len) throws IOException { - return stream.read(b, off, len); - } - - @Override - public int read(byte b[]) throws IOException { - return stream.read(b); - } - } -}