http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 0000000,3d3e854..dcb461c mode 000000,100644..100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@@ -1,0 -1,2689 +1,2684 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.repository; + + import java.io.ByteArrayInputStream; + import java.io.EOFException; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.nio.file.Files; + import java.nio.file.Path; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Iterator; + import java.util.LinkedHashSet; + import java.util.List; + import java.util.Map; + import java.util.NoSuchElementException; + import java.util.Set; + import java.util.UUID; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import java.util.regex.Pattern; + + import org.apache.nifi.connectable.Connectable; + import org.apache.nifi.connectable.Connection; + import org.apache.nifi.controller.FlowFileQueue; + import org.apache.nifi.controller.ProcessorNode; + import org.apache.nifi.controller.repository.claim.ContentClaim; + import org.apache.nifi.controller.repository.io.ByteCountingInputStream; + import org.apache.nifi.controller.repository.io.ByteCountingOutputStream; + import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream; + import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream; + import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream; + import org.apache.nifi.controller.repository.io.LimitedInputStream; + import org.apache.nifi.controller.repository.io.LongHolder; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.stream.io.NonCloseableInputStream; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.FlowFileFilter; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.QueueSize; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.exception.FlowFileAccessException; + import org.apache.nifi.processor.exception.FlowFileHandlingException; + import org.apache.nifi.processor.exception.MissingFlowFileException; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.provenance.ProvenanceEventBuilder; + import org.apache.nifi.provenance.ProvenanceEventRecord; + import org.apache.nifi.provenance.ProvenanceEventRepository; + import org.apache.nifi.provenance.ProvenanceEventType; + import org.apache.nifi.provenance.ProvenanceReporter; + import org.apache.nifi.provenance.StandardProvenanceEventRecord; + import org.apache.nifi.util.NiFiProperties; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * <p> + * Provides a ProcessSession that ensures all accesses, changes and transfers + * occur in an atomic manner for all FlowFiles including their contents and + * attributes</p> + * <p> + * NOT THREAD SAFE</p> + * <p/> + * @author none + */ + public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher { + + private static final AtomicLong idGenerator = new AtomicLong(0L); + + // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback + public static final int VERBOSE_LOG_THRESHOLD = 10; + private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize( + NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue(); + private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim(); + + public static final String DEFAULT_FLOWFILE_PATH = "./"; + + private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class); + private static final Logger claimLog = LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + ".claims"); + + private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>(); + private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>(); + private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>(); + private final Map<String, Long> localCounters = new HashMap<>(); + private final Map<String, Long> globalCounters = new HashMap<>(); + private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>(); + private final ProcessContext context; + private final Set<FlowFile> recursionSet = new HashSet<>();//set used to track what is currently being operated on to prevent logic failures if recursive calls occurring + private final Set<Path> deleteOnCommit = new HashSet<>(); + private final long sessionId; + private final String connectableDescription; + + private final Set<String> removedFlowFiles = new HashSet<>(); + private final Set<String> createdFlowFiles = new HashSet<>(); + + private final StandardProvenanceReporter provenanceReporter; + + private int removedCount = 0; // number of flowfiles removed in this session + private long removedBytes = 0L; // size of all flowfiles removed in this session + private LongHolder bytesRead = new LongHolder(0L); + private LongHolder bytesWritten = new LongHolder(0L); + private int flowFilesIn = 0, flowFilesOut = 0; + private long contentSizeIn = 0L, contentSizeOut = 0L; + private int writeRecursionLevel = 0; + + private ContentClaim currentWriteClaim = null; + private OutputStream currentWriteClaimStream = null; + private long currentWriteClaimSize = 0L; + private int currentWriteClaimFlowFileCount = 0; + + private ContentClaim currentReadClaim = null; + private ByteCountingInputStream currentReadClaimStream = null; + private long processingStartTime; + + // maps a FlowFile to all Provenance Events that were generated for that FlowFile. + // we do this so that if we generate a Fork event, for example, and then remove the event in the same + // Session, we will not send that event to the Provenance Repository + private Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<>(); + + // when Forks are generated for a single parent, we add the Fork event to this map, with the Key being the parent + // so that we are able to aggregate many into a single Fork Event. + private Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>(); + + private Checkpoint checkpoint = new Checkpoint(); + + public StandardProcessSession(final ProcessContext context) { + this.context = context; + + final Connectable connectable = context.getConnectable(); + final String componentType; + + String description = connectable.toString(); + switch (connectable.getConnectableType()) { + case PROCESSOR: + final ProcessorNode procNode = (ProcessorNode) connectable; + componentType = procNode.getProcessor().getClass().getSimpleName(); + description = procNode.getProcessor().toString(); + break; + case INPUT_PORT: + componentType = "Input Port"; + break; + case OUTPUT_PORT: + componentType = "Output Port"; + break; + case REMOTE_INPUT_PORT: + componentType = "Remote Input Port"; + break; + case REMOTE_OUTPUT_PORT: + componentType = "Remote Output Port"; + break; + case FUNNEL: + componentType = "Funnel"; + break; + default: + throw new AssertionError("Connectable type is " + connectable.getConnectableType()); + } + + this.provenanceReporter = new StandardProvenanceReporter(connectable.getIdentifier(), componentType, context.getProvenanceRepository(), this); + this.sessionId = idGenerator.getAndIncrement(); + this.connectableDescription = description; + + LOG.trace("Session {} created for {}", this, connectableDescription); + processingStartTime = System.nanoTime(); + } + + public void checkpoint() { + if (!recursionSet.isEmpty()) { + throw new IllegalStateException(); + } + + if (this.checkpoint == null) { + this.checkpoint = new Checkpoint(); + } + + if (records.isEmpty()) { + LOG.trace("{} checkpointed, but no events were performed by this ProcessSession", this); + return; + } + + // any drop event that is the result of an auto-terminate should happen at the very end, so we keep the + // records in a separate List so that they can be persisted to the Provenance Repo after all of the + // Processor-reported events. + List<ProvenanceEventRecord> autoTerminatedEvents = null; + + //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary + final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>(); + for (final StandardRepositoryRecord record : records.values()) { + if (record.isMarkedForDelete()) { + continue; + } + final Relationship relationship = record.getTransferRelationship(); + if (relationship == null) { + rollback(); + throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified"); + } + final List<Connection> destinations = new ArrayList<>(context.getConnections(relationship)); + if (destinations.isEmpty() && !context.getConnectable().isAutoTerminated(relationship)) { + if (relationship != Relationship.SELF) { + rollback(); + throw new FlowFileHandlingException(relationship + " does not have any destinations for " + context.getConnectable()); + } + } + + if (destinations.isEmpty() && relationship == Relationship.SELF) { + record.setDestination(record.getOriginalQueue()); + } else if (destinations.isEmpty()) { + record.markForDelete(); + + if (autoTerminatedEvents == null) { + autoTerminatedEvents = new ArrayList<>(); + } + + final ProvenanceEventRecord dropEvent; + try { + dropEvent = provenanceReporter.generateDropEvent(record.getCurrent(), "Auto-Terminated by " + relationship.getName() + " Relationship"); + autoTerminatedEvents.add(dropEvent); + } catch (final Exception e) { + LOG.warn("Unable to generate Provenance Event for {} on behalf of {} due to {}", record.getCurrent(), connectableDescription, e); + if (LOG.isDebugEnabled()) { + LOG.warn("", e); + } + } + } else { + final Connection finalDestination = destinations.remove(destinations.size() - 1); //remove last element + record.setDestination(finalDestination.getFlowFileQueue()); + incrementConnectionInputCounts(finalDestination, record); + + for (final Connection destination : destinations) { //iterate over remaining destinations and "clone" as needed + incrementConnectionInputCounts(destination, record); + final FlowFileRecord currRec = record.getCurrent(); + final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); + builder.id(context.getNextFlowFileSequence()); + + final String newUuid = UUID.randomUUID().toString(); + builder.addAttribute(CoreAttributes.UUID.key(), newUuid); + + final FlowFileRecord clone = builder.build(); + final StandardRepositoryRecord newRecord = new StandardRepositoryRecord(destination.getFlowFileQueue()); + getProvenanceReporter().clone(currRec, clone); + + final ContentClaim claim = clone.getContentClaim(); + if (claim != null) { + context.getContentRepository().incrementClaimaintCount(claim); + } + newRecord.setWorking(clone, CoreAttributes.UUID.key(), newUuid); + + newRecord.setDestination(destination.getFlowFileQueue()); + newRecord.setTransferRelationship(record.getTransferRelationship()); + // put the mapping into toAdd because adding to records now will cause a ConcurrentModificationException + toAdd.put(clone, newRecord); + } + } + } + + records.putAll(toAdd); + toAdd.clear(); + + checkpoint.checkpoint(this, autoTerminatedEvents); + resetState(); + } + + @Override + public void commit() { + checkpoint(); + commit(this.checkpoint); + this.checkpoint = null; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void commit(final Checkpoint checkpoint) { + final long commitStartNanos = System.nanoTime(); + + resetWriteClaims(); + resetReadClaim(); + + final long updateProvenanceStart = System.nanoTime(); + updateProvenanceRepo(checkpoint); + + final long claimRemovalStart = System.nanoTime(); + final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart; + + // Figure out which content claims can be released. + // At this point, we will decrement the Claimant Count for the claims via the Content Repository. + // We do not actually destroy the content because otherwise, we could remove the + // Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that + // the content claim points to the Original Claim -- which has already been removed! + for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) { + final FlowFile flowFile = entry.getKey(); + final StandardRepositoryRecord record = entry.getValue(); + + if (record.isMarkedForDelete()) { + // if the working claim is not the same as the original claim, we can immediately destroy the working claim + // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync. + removeContent(record.getWorkingClaim()); + + if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) { + // if working & original claim are same, don't remove twice; we only want to remove the original + // if it's different from the working. Otherwise, we remove two claimant counts. This causes + // an issue if we only updated the FlowFile attributes. + removeContent(record.getOriginalClaim()); + } + final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); + final Connectable connectable = context.getConnectable(); + final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable; + LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); + } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { + //records which have been updated - remove original if exists + removeContent(record.getOriginalClaim()); + } + } + + final long claimRemovalFinishNanos = System.nanoTime(); + final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart; + + // Update the FlowFile Repository + try { + final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values(); + context.getFlowFileRepository().updateRepository((Collection) repoRecords); + } catch (final IOException ioe) { + rollback(); + throw new ProcessException("FlowFile Repository failed to update", ioe); + } + final long flowFileRepoUpdateFinishNanos = System.nanoTime(); + final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos; + + updateEventRepository(checkpoint); + + final long updateEventRepositoryFinishNanos = System.nanoTime(); + final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos; + + // transfer the flowfiles to the connections' queues. + final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>(); + for (final StandardRepositoryRecord record : checkpoint.records.values()) { + if (record.isMarkedForAbort() || record.isMarkedForDelete()) { + continue; //these don't need to be transferred + } + // record.getCurrent() will return null if this record was created in this session -- + // in this case, we just ignore it, and it will be cleaned up by clearing the records map. + if (record.getCurrent() != null) { + Collection<FlowFileRecord> collection = recordMap.get(record.getDestination()); + if (collection == null) { + collection = new ArrayList<>(); + recordMap.put(record.getDestination(), collection); + } + collection.add(record.getCurrent()); + } + } + + for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> entry : recordMap.entrySet()) { + entry.getKey().putAll(entry.getValue()); + } + + final long enqueueFlowFileFinishNanos = System.nanoTime(); + final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos; + + // Delete any files from disk that need to be removed. + for (final Path path : checkpoint.deleteOnCommit) { + try { + Files.deleteIfExists(path); + } catch (final IOException e) { + throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e); + } + } + checkpoint.deleteOnCommit.clear(); + + if (LOG.isInfoEnabled()) { + final String sessionSummary = summarizeEvents(checkpoint); + if (!sessionSummary.isEmpty()) { + LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary}); + } + } + + for (final Map.Entry<String, Long> entry : checkpoint.localCounters.entrySet()) { + adjustCounter(entry.getKey(), entry.getValue(), true); + } + + for (final Map.Entry<String, Long> entry : checkpoint.globalCounters.entrySet()) { + adjustCounter(entry.getKey(), entry.getValue(), true); + } + + acknowledgeRecords(); + resetState(); + + if (LOG.isDebugEnabled()) { + final StringBuilder timingInfo = new StringBuilder(); + timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took "); + + final long commitNanos = System.nanoTime() - commitStartNanos; + formatNanos(commitNanos, timingInfo); + timingInfo.append("; FlowFile Repository Update took "); + formatNanos(flowFileRepoUpdateNanos, timingInfo); + timingInfo.append("; Claim Removal took "); + formatNanos(claimRemovalNanos, timingInfo); + timingInfo.append("; FlowFile Event Update took "); + formatNanos(updateEventRepositoryNanos, timingInfo); + timingInfo.append("; Enqueuing FlowFiles took "); + formatNanos(enqueueFlowFileNanos, timingInfo); + timingInfo.append("; Updating Provenance Event Repository took "); + formatNanos(updateProvenanceNanos, timingInfo); + + LOG.debug(timingInfo.toString()); + } + } + + private void updateEventRepository(final Checkpoint checkpoint) { + int flowFilesReceived = 0; + int flowFilesSent = 0; + long bytesReceived = 0L; + long bytesSent = 0L; + + for (final ProvenanceEventRecord event : checkpoint.reportedEvents) { + if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { + continue; + } + + switch (event.getEventType()) { + case SEND: + flowFilesSent++; + bytesSent += event.getFileSize(); + break; + case RECEIVE: + flowFilesReceived++; + bytesReceived += event.getFileSize(); + break; + default: + break; + } + } + + try { + // update event repository + final Connectable connectable = context.getConnectable(); + final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + flowFileEvent.setBytesRead(checkpoint.bytesRead); + flowFileEvent.setBytesWritten(checkpoint.bytesWritten); + flowFileEvent.setContentSizeIn(checkpoint.contentSizeIn); + flowFileEvent.setContentSizeOut(checkpoint.contentSizeOut); + flowFileEvent.setContentSizeRemoved(checkpoint.removedBytes); + flowFileEvent.setFlowFilesIn(checkpoint.flowFilesIn); + flowFileEvent.setFlowFilesOut(checkpoint.flowFilesOut); + flowFileEvent.setFlowFilesRemoved(checkpoint.removedCount); + flowFileEvent.setFlowFilesReceived(flowFilesReceived); + flowFileEvent.setBytesReceived(bytesReceived); + flowFileEvent.setFlowFilesSent(flowFilesSent); + flowFileEvent.setBytesSent(bytesSent); + + long lineageMillis = 0L; + for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) { + final FlowFile flowFile = entry.getKey(); + final long lineageDuration = System.currentTimeMillis() - flowFile.getLineageStartDate(); + lineageMillis += lineageDuration; + } + flowFileEvent.setAggregateLineageMillis(lineageMillis); + + context.getFlowFileEventRepository().updateRepository(flowFileEvent); + + for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) { + context.getFlowFileEventRepository().updateRepository(connectionEvent); + } + } catch (final IOException ioe) { + LOG.error("FlowFile Event Repository failed to update", ioe); + } + } + + private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) { + Set<ProvenanceEventType> eventTypes = map.get(id); + if ( eventTypes == null ) { + eventTypes = new HashSet<>(); + map.put(id, eventTypes); + } + + eventTypes.add(eventType); + } + + private void updateProvenanceRepo(final Checkpoint checkpoint) { + // Update Provenance Repository + final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository(); + + // We need to de-dupe the events that we've created and those reported to the provenance reporter, + // in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet + // for this, so that we are able to ensure that the events are submitted in the proper order. + final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>(); + final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = new HashMap<>(); + + final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents; + + // We first want to submit FORK events because if the Processor is going to create events against + // a FlowFile, that FlowFile needs to be shown to be created first. + // However, if the Processor has generated a FORK event, we don't want to use the Framework-created one -- + // we prefer to use the event generated by the Processor. We can determine this by checking if the Set of events genereated + // by the Processor contains any of the FORK events that we generated + for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : checkpoint.forkEventBuilders.entrySet()) { + final ProvenanceEventBuilder builder = entry.getValue(); + final FlowFile flowFile = entry.getKey(); + + updateEventContentClaims(builder, flowFile, checkpoint.records.get(flowFile)); + final ProvenanceEventRecord event = builder.build(); + + if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) { + recordsToSubmit.add(event); + + for ( final String childUuid : event.getChildUuids() ) { + addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType()); + } + for ( final String parentUuid : event.getParentUuids() ) { + addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType()); + } + } + } + + // Now add any Processor-reported events. + for (final ProvenanceEventRecord event : processorGenerated) { + if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { + continue; + } + if ( isSpuriousRouteEvent(event, checkpoint.records) ) { + continue; + } + + // Check if the event indicates that the FlowFile was routed to the same + // connection from which it was pulled (and only this connection). If so, discard the event. + isSpuriousRouteEvent(event, checkpoint.records); + + recordsToSubmit.add(event); + addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); + } + + // Finally, add any other events that we may have generated. + for (final List<ProvenanceEventRecord> eventList : checkpoint.generatedProvenanceEvents.values()) { + for (final ProvenanceEventRecord event : eventList) { + if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { + continue; + } + + recordsToSubmit.add(event); + addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); + } + } + + // Check if content or attributes changed. If so, register the appropriate events. + for (final StandardRepositoryRecord repoRecord : checkpoint.records.values() ) { + final ContentClaim original = repoRecord.getOriginalClaim(); + final ContentClaim current = repoRecord.getCurrentClaim(); + + boolean contentChanged = false; + if ( original == null && current != null ) { + contentChanged = true; + } + if ( original != null && current == null ) { + contentChanged = true; + } + if ( original != null && current != null && !original.equals(current) ) { + contentChanged = true; + } + + final FlowFileRecord curFlowFile = repoRecord.getCurrent(); + final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key()); + boolean eventAdded = false; + + if (checkpoint.removedFlowFiles.contains(flowFileId)) { + continue; + } + + final boolean newFlowFile = repoRecord.getOriginal() == null; + if ( contentChanged && !newFlowFile ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build()); + addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED); + eventAdded = true; + } + + if ( checkpoint.createdFlowFiles.contains(flowFileId) ) { + final Set<ProvenanceEventType> registeredTypes = eventTypesPerFlowFileId.get(flowFileId); + boolean creationEventRegistered = false; + if ( registeredTypes != null ) { + if ( registeredTypes.contains(ProvenanceEventType.CREATE) || + registeredTypes.contains(ProvenanceEventType.FORK) || + registeredTypes.contains(ProvenanceEventType.JOIN) || + registeredTypes.contains(ProvenanceEventType.RECEIVE) ) { + creationEventRegistered = true; + } + } + + if ( !creationEventRegistered ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build()); + eventAdded = true; + } + } + + if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) { + // We generate an ATTRIBUTES_MODIFIED event only if no other event has been + // created for the FlowFile. We do this because all events contain both the + // newest and the original attributes, so generating an ATTRIBUTES_MODIFIED + // event is redundant if another already exists. + if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build()); + addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED); + } + } + } + + // We want to submit the 'recordsToSubmit' collection, followed by the auto-terminated events to the Provenance Repository. + // We want to do this with a single call to ProvenanceEventRepository#registerEvents because it may be much more efficient + // to do so. + // However, we want to modify the events in 'recordsToSubmit' to obtain the data from the most recent version of the FlowFiles + // (except for SEND events); see note below as to why this is + // Therefore, we create an Iterable that can iterate over each of these events, modifying them as needed, and returning them + // in the appropriate order. This prevents an unnecessary step of creating an intermediate List and adding all of those values + // to the List. + // This is done in a similar veign to how Java 8's streams work, iterating over the events and returning a processed version + // one-at-a-time as opposed to iterating over the entire Collection and putting the results in another Collection. However, + // we don't want to change the Framework to require Java 8 at this time, because it's not yet as prevalent as we would desire + final Map<String, FlowFileRecord> flowFileRecordMap = new HashMap<>(); + for (final StandardRepositoryRecord repoRecord : checkpoint.records.values()) { + final FlowFileRecord flowFile = repoRecord.getCurrent(); + flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile); + } + + final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents; + final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() { + final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator(); + final Iterator<ProvenanceEventRecord> autoTermIterator = autoTermEvents == null ? null : autoTermEvents.iterator(); + + @Override + public Iterator<ProvenanceEventRecord> iterator() { + return new Iterator<ProvenanceEventRecord>() { + @Override + public boolean hasNext() { + return recordsToSubmitIterator.hasNext() || (autoTermIterator != null && autoTermIterator.hasNext()); + } + + @Override + public ProvenanceEventRecord next() { + if (recordsToSubmitIterator.hasNext()) { + final ProvenanceEventRecord rawEvent = recordsToSubmitIterator.next(); + + // Update the Provenance Event Record with all of the info that we know about the event. + // For SEND events, we do not want to update the FlowFile info on the Event, because the event should + // reflect the FlowFile as it was sent to the remote system. However, for other events, we want to use + // the representation of the FlowFile as it is committed, as this is the only way in which it really + // exists in our system -- all other representations are volatile representations that have not been + // exposed. + return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND); + } else if (autoTermIterator != null && autoTermIterator.hasNext()) { + return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true); + } + + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + + provenanceRepo.registerEvents(iterable); + } + + private void updateEventContentClaims(final ProvenanceEventBuilder builder, final FlowFile flowFile, final StandardRepositoryRecord repoRecord) { + final ContentClaim originalClaim = repoRecord.getOriginalClaim(); + if (originalClaim == null) { + builder.setCurrentContentClaim(null, null, null, null, 0L); + } else { + builder.setCurrentContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()); + builder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()); + } + } + + @Override + public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile) { + final StandardRepositoryRecord repoRecord = records.get(flowFile); + if (repoRecord == null) { + throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")"); + } + + final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); + if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) { + final ContentClaim currentClaim = repoRecord.getCurrentClaim(); + final long currentOffset = repoRecord.getCurrentClaimOffset(); + final long size = flowFile.getSize(); + recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size); + } + + if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) { + final ContentClaim originalClaim = repoRecord.getOriginalClaim(); + final long originalOffset = repoRecord.getOriginal().getContentClaimOffset(); + final long originalSize = repoRecord.getOriginal().getSize(); + recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize); + } + + final FlowFileQueue originalQueue = repoRecord.getOriginalQueue(); + if (originalQueue != null) { + recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier()); + } + + recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes()); + return recordBuilder.build(); + } + + private StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) { + final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); + final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); + if (eventFlowFile != null) { + final StandardRepositoryRecord repoRecord = records.get(eventFlowFile); + + if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) { + final ContentClaim currentClaim = repoRecord.getCurrentClaim(); + final long currentOffset = repoRecord.getCurrentClaimOffset(); + final long size = eventFlowFile.getSize(); + recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size); + } + + if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) { + final ContentClaim originalClaim = repoRecord.getOriginalClaim(); + final long originalOffset = repoRecord.getOriginal().getContentClaimOffset(); + final long originalSize = repoRecord.getOriginal().getSize(); + recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize); + } + + final FlowFileQueue originalQueue = repoRecord.getOriginalQueue(); + if (originalQueue != null) { + recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier()); + } + } + + if (updateAttributes) { + final FlowFileRecord flowFileRecord = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); + if (flowFileRecord != null) { + final StandardRepositoryRecord record = records.get(flowFileRecord); + if (record != null) { + recordBuilder.setAttributes(record.getOriginalAttributes(), record.getUpdatedAttributes()); + } + } + } + + return recordBuilder.build(); + } + + /** + * Checks if the given event is a spurious FORK, meaning that the FORK has a + * single child and that child was removed in this session. This happens + * when a Processor calls #create(FlowFile) and then removes the created + * FlowFile. + * + * @param event + * @return + */ + private boolean isSpuriousForkEvent(final ProvenanceEventRecord event, final Set<String> removedFlowFiles) { + if (event.getEventType() == ProvenanceEventType.FORK) { + final List<String> childUuids = event.getChildUuids(); + if (childUuids != null && childUuids.size() == 1 && removedFlowFiles.contains(childUuids.get(0))) { + return true; + } + } + + return false; + } + + + /** + * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile + * was routed to a relationship with only 1 connection and that Connection is the Connection from which + * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere. + * + * @param event + * @param records + * @return + */ + private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) { + if ( event.getEventType() == ProvenanceEventType.ROUTE ) { + final String relationshipName = event.getRelationship(); + final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + final Collection<Connection> connectionsForRelationship = this.context.getConnections(relationship); + + // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event, + // as it may be cloning the FlowFile and adding to multiple connections. + if ( connectionsForRelationship.size() == 1 ) { + for ( final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet() ) { + final FlowFileRecord flowFileRecord = entry.getKey(); + if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) { + final StandardRepositoryRecord repoRecord = entry.getValue(); + if ( repoRecord.getOriginalQueue() == null ) { + return false; + } + + final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier(); + final Connection destinationConnection = connectionsForRelationship.iterator().next(); + final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier(); + return originalQueueId.equals(destinationQueueId); + } + } + } + } + + return false; + } + + @Override + public void rollback() { + rollback(false); + } + + @Override + public void rollback(final boolean penalize) { + deleteOnCommit.clear(); + if (records.isEmpty()) { + LOG.trace("{} was rolled back, but no events were performed by this ProcessSession", this); + acknowledgeRecords(); + return; + } + + resetWriteClaims(); + resetReadClaim(); + + for (final StandardRepositoryRecord record : records.values()) { + // remove the working claim if it's different than the original. + removeTemporaryClaim(record); + } + + final Set<RepositoryRecord> abortedRecords = new HashSet<>(); + final Set<StandardRepositoryRecord> transferRecords = new HashSet<>(); + for (final StandardRepositoryRecord record : records.values()) { + if (record.isMarkedForAbort()) { + removeContent(record.getWorkingClaim()); + if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) { + // if working & original claim are same, don't remove twice; we only want to remove the original + // if it's different from the working. Otherwise, we remove two claimant counts. This causes + // an issue if we only updated the flowfile attributes. + removeContent(record.getCurrentClaim()); + } + abortedRecords.add(record); + } else { + transferRecords.add(record); + } + } + + // Put the FlowFiles that are not marked for abort back to their original queues + for (final StandardRepositoryRecord record : transferRecords) { + if (record.getOriginal() != null) { + final FlowFileQueue originalQueue = record.getOriginalQueue(); + if (originalQueue != null) { + if (penalize) { + final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getOriginal()).penaltyExpirationTime(expirationEpochMillis).build(); + originalQueue.put(newFile); + } else { + originalQueue.put(record.getOriginal()); + } + } + } + } + + if (!abortedRecords.isEmpty()) { + try { + context.getFlowFileRepository().updateRepository(abortedRecords); + } catch (final IOException ioe) { + LOG.error("Unable to update FlowFile repository for aborted records due to {}", ioe.toString()); + if (LOG.isDebugEnabled()) { + LOG.error("", ioe); + } + } + } + + final Connectable connectable = context.getConnectable(); + final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + flowFileEvent.setBytesRead(bytesRead.getValue()); + flowFileEvent.setBytesWritten(bytesWritten.getValue()); + + // update event repository + try { + context.getFlowFileEventRepository().updateRepository(flowFileEvent); + } catch (final Exception e) { + LOG.error("Failed to update FlowFileEvent Repository due to " + e); + if (LOG.isDebugEnabled()) { + LOG.error("", e); + } + } + + acknowledgeRecords(); + resetState(); + } + + private void removeContent(final ContentClaim claim) { + if (claim == null) { + return; + } + + context.getContentRepository().decrementClaimantCount(claim); + } + + /** + * Destroys a ContentClaim that was being written to but is no longer needed + * + * @param claim + */ + private void destroyContent(final ContentClaim claim) { + if (claim == null) { + return; + } + + final int decrementedClaimCount = context.getContentRepository().decrementClaimantCount(claim); + if (decrementedClaimCount <= 0) { + resetWriteClaims(); // Have to ensure that we are not currently writing to the claim before we can destroy it. + context.getContentRepository().remove(claim); + } + } + + private void resetState() { + records.clear(); + recursionSet.clear(); + contentSizeIn = 0L; + contentSizeOut = 0L; + flowFilesIn = 0; + flowFilesOut = 0; + removedCount = 0; + removedBytes = 0L; + bytesRead.setValue(0L); + bytesWritten.setValue(0L); + connectionCounts.clear(); + createdFlowFiles.clear(); + removedFlowFiles.clear(); + globalCounters.clear(); + localCounters.clear(); + + generatedProvenanceEvents.clear(); + forkEventBuilders.clear(); + provenanceReporter.clear(); + + processingStartTime = System.nanoTime(); + } + + private void acknowledgeRecords() { + for (final Map.Entry<Connection, Set<FlowFileRecord>> entry : unacknowledgedFlowFiles.entrySet()) { + entry.getKey().getFlowFileQueue().acknowledge(entry.getValue()); + } + unacknowledgedFlowFiles.clear(); + } + + private String summarizeEvents(final Checkpoint checkpoint) { + final Map<Relationship, Set<String>> transferMap = new HashMap<>(); // relationship to flowfile ID's + final Set<String> modifiedFlowFileIds = new HashSet<>(); + int largestTransferSetSize = 0; + + for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) { + final FlowFile flowFile = entry.getKey(); + final StandardRepositoryRecord record = entry.getValue(); + + final Relationship relationship = record.getTransferRelationship(); + if (Relationship.SELF.equals(relationship)) { + continue; + } + + Set<String> transferIds = transferMap.get(relationship); + if (transferIds == null) { + transferIds = new HashSet<>(); + transferMap.put(relationship, transferIds); + } + transferIds.add(flowFile.getAttribute(CoreAttributes.UUID.key())); + largestTransferSetSize = Math.max(largestTransferSetSize, transferIds.size()); + + final ContentClaim workingClaim = record.getWorkingClaim(); + if (workingClaim != null && workingClaim != record.getOriginalClaim() && record.getTransferRelationship() != null) { + modifiedFlowFileIds.add(flowFile.getAttribute(CoreAttributes.UUID.key())); + } + } + + final int numRemoved = checkpoint.removedFlowFiles.size(); + final int numModified = modifiedFlowFileIds.size(); + final int numCreated = checkpoint.createdFlowFiles.size(); + + final StringBuilder sb = new StringBuilder(512); + if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { + if (numCreated > 0) { + sb.append("created ").append(numCreated).append(" FlowFiles, "); + } + if (numModified > 0) { + sb.append("modified ").append(modifiedFlowFileIds.size()).append(" FlowFiles, "); + } + if (numRemoved > 0) { + sb.append("removed ").append(numRemoved).append(" FlowFiles, "); + } + for (final Map.Entry<Relationship, Set<String>> entry : transferMap.entrySet()) { + if (entry.getKey() != null) { + sb.append("Transferred ").append(entry.getValue().size()).append(" FlowFiles"); + + final Relationship relationship = entry.getKey(); + if (relationship != Relationship.ANONYMOUS) { + sb.append(" to '").append(relationship.getName()).append("', "); + } + } + } + } else { + if (numCreated > 0) { + sb.append("created FlowFiles ").append(checkpoint.createdFlowFiles).append(", "); + } + if (numModified > 0) { + sb.append("modified FlowFiles ").append(modifiedFlowFileIds).append(", "); + } + if (numRemoved > 0) { + sb.append("removed FlowFiles ").append(checkpoint.removedFlowFiles).append(", "); + } + for (final Map.Entry<Relationship, Set<String>> entry : transferMap.entrySet()) { + if (entry.getKey() != null) { + sb.append("Transferred FlowFiles ").append(entry.getValue()); + + final Relationship relationship = entry.getKey(); + if (relationship != Relationship.ANONYMOUS) { + sb.append(" to '").append(relationship.getName()).append("', "); + } + } + } + } + + if (sb.length() > 2 && sb.subSequence(sb.length() - 2, sb.length()).equals(", ")) { + sb.delete(sb.length() - 2, sb.length()); + } + + // don't add processing time if we did nothing, because we don't log the summary anyway + if (sb.length() > 0) { + final long processingNanos = checkpoint.processingTime; + sb.append(", Processing Time = "); + formatNanos(processingNanos, sb); + } + + return sb.toString(); + } + + private void formatNanos(final long nanos, final StringBuilder sb) { + final long seconds = (nanos > 1000000000L) ? (nanos / 1000000000L) : 0L; + long millis = (nanos > 1000000L) ? (nanos / 1000000L) : 0L;; + final long nanosLeft = nanos % 1000000L; + + if (seconds > 0) { + sb.append(seconds).append(" seconds"); + } + if (millis > 0) { + if (seconds > 0) { + sb.append(", "); + millis -= seconds * 1000L; + } + + sb.append(millis).append(" millis"); + } + if (seconds == 0 && millis == 0) { + sb.append(nanosLeft).append(" nanos"); + } + + sb.append(" (").append(nanos).append(" nanos)"); + } + + private void incrementConnectionInputCounts(final Connection connection, final RepositoryRecord record) { + StandardFlowFileEvent connectionEvent = connectionCounts.get(connection); + if (connectionEvent == null) { + connectionEvent = new StandardFlowFileEvent(connection.getIdentifier()); + connectionCounts.put(connection, connectionEvent); + } + connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + record.getCurrent().getSize()); + connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + 1); + } + + private void incrementConnectionOutputCounts(final Connection connection, final FlowFileRecord record) { + StandardFlowFileEvent connectionEvent = connectionCounts.get(connection); + if (connectionEvent == null) { + connectionEvent = new StandardFlowFileEvent(connection.getIdentifier()); + connectionCounts.put(connection, connectionEvent); + } + connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + record.getSize()); + connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + 1); + } + + private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) { + final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile); + records.put(flowFile, record); + flowFilesIn++; + contentSizeIn += flowFile.getSize(); + + Set<FlowFileRecord> set = unacknowledgedFlowFiles.get(connection); + if (set == null) { + set = new HashSet<>(); + unacknowledgedFlowFiles.put(connection, set); + } + set.add(flowFile); + + incrementConnectionOutputCounts(connection, flowFile); + } + + @Override + public void adjustCounter(final String name, final long delta, final boolean immediate) { + if (immediate) { + context.adjustCounter(name, delta); + return; + } + + adjustCounter(name, delta, localCounters); + adjustCounter(name, delta, globalCounters); + } + + private void adjustCounter(final String name, final long delta, final Map<String, Long> map) { + Long curVal = map.get(name); + if (curVal == null) { + curVal = Long.valueOf(0L); + } + + final long newValue = curVal.longValue() + delta; + map.put(name, Long.valueOf(newValue)); + } + + @Override + public FlowFile get() { + final List<Connection> connections = context.getPollableConnections(); + final int numConnections = connections.size(); + for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) { + final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % connections.size()); + final Set<FlowFileRecord> expired = new HashSet<>(); + final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired); + removeExpired(expired, conn); + + if (flowFile != null) { + registerDequeuedRecord(flowFile, conn); + return flowFile; + } + } + + return null; + } + + @Override + public List<FlowFile> get(final int maxResults) { + if (maxResults < 0) { + throw new IllegalArgumentException(); + } + if (maxResults == 0) { + return Collections.emptyList(); + } + + return get(new QueuePoller() { + @Override + public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) { + return queue.poll(new FlowFileFilter() { + int polled = 0; + + @Override + public FlowFileFilterResult filter(final FlowFile flowFile) { + if (++polled <= maxResults) { + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } else { + return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + } + } + }, expiredRecords); + } + }, false); + } + + @Override + public List<FlowFile> get(final FlowFileFilter filter) { + return get(new QueuePoller() { + @Override + public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) { + return queue.poll(filter, expiredRecords); + } + }, true); + } + + private List<FlowFile> get(final QueuePoller poller, final boolean lockAllQueues) { + final List<Connection> connections = context.getPollableConnections(); + if (lockAllQueues) { + for (final Connection connection : connections) { + connection.lock(); + } + } + + try { + for (final Connection conn : connections) { + final Set<FlowFileRecord> expired = new HashSet<>(); + final List<FlowFileRecord> newlySelected = poller.poll(conn.getFlowFileQueue(), expired); + removeExpired(expired, conn); + + if (newlySelected.isEmpty() && expired.isEmpty()) { + continue; + } + + for (final FlowFileRecord flowFile : newlySelected) { + registerDequeuedRecord(flowFile, conn); + } + + return new ArrayList<FlowFile>(newlySelected); + } + + return new ArrayList<>(); + } finally { + if (lockAllQueues) { + for (final Connection connection : connections) { + connection.unlock(); + } + } + } + } + + @Override + public QueueSize getQueueSize() { + int flowFileCount = 0; + long byteCount = 0L; + for (final Connection conn : context.getPollableConnections()) { + final QueueSize queueSize = conn.getFlowFileQueue().getActiveQueueSize(); + flowFileCount += queueSize.getObjectCount(); + byteCount += queueSize.getByteCount(); + } + return new QueueSize(flowFileCount, byteCount); + } + + @Override - public Set<Relationship> getAvailableRelationships() { - return context.getAvailableRelationships(); - } - - @Override + public FlowFile create() { + final Map<String, String> attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); + attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); + attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + + final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) + .addAttributes(attrs) + .build(); + final StandardRepositoryRecord record = new StandardRepositoryRecord(null); + record.setWorking(fFile, attrs); + records.put(fFile, record); + createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); + return fFile; + } + + @Override + public FlowFile clone(final FlowFile example) { + return clone(example, 0L, example.getSize()); + } + + @Override + public FlowFile clone(final FlowFile example, final long offset, final long size) { + validateRecordState(example); + final StandardRepositoryRecord exampleRepoRecord = records.get(example); + final FlowFileRecord currRec = exampleRepoRecord.getCurrent(); + final ContentClaim claim = exampleRepoRecord.getCurrentClaim(); + if (offset + size > example.getSize()) { + throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + example.toString()); + } + + final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); + builder.id(context.getNextFlowFileSequence()); + builder.contentClaimOffset(currRec.getContentClaimOffset() + offset); + builder.size(size); + + final String newUuid = UUID.randomUUID().toString(); + builder.addAttribute(CoreAttributes.UUID.key(), newUuid); + + final FlowFileRecord clone = builder.build(); + if (claim != null) { + context.getContentRepository().incrementClaimaintCount(claim); + } + final StandardRepositoryRecord record = new StandardRepositoryRecord(null); + record.setWorking(clone, CoreAttributes.UUID.key(), newUuid); + records.put(clone, record); + + if (offset == 0L && size == example.getSize()) { + provenanceReporter.clone(example, clone); + } else { + registerForkEvent(example, clone); + } + + return clone; + } + + private void registerForkEvent(final FlowFile parent, final FlowFile child) { + ProvenanceEventBuilder eventBuilder = forkEventBuilders.get(parent); + if (eventBuilder == null) { + eventBuilder = context.getProvenanceRepository().eventBuilder(); + eventBuilder.setEventType(ProvenanceEventType.FORK); + + eventBuilder.setFlowFileEntryDate(parent.getEntryDate()); + eventBuilder.setLineageIdentifiers(parent.getLineageIdentifiers()); + eventBuilder.setLineageStartDate(parent.getLineageStartDate()); + eventBuilder.setFlowFileUUID(parent.getAttribute(CoreAttributes.UUID.key())); + + eventBuilder.setComponentId(context.getConnectable().getIdentifier()); + + final Connectable connectable = context.getConnectable(); + final String processorType; + if (connectable instanceof ProcessorNode) { + processorType = ((ProcessorNode) connectable).getProcessor().getClass().getSimpleName(); + } else { + processorType = connectable.getClass().getSimpleName(); + } + eventBuilder.setComponentType(processorType); + eventBuilder.addParentFlowFile(parent); + + updateEventContentClaims(eventBuilder, parent, records.get(parent)); + forkEventBuilders.put(parent, eventBuilder); + } + + eventBuilder.addChildFlowFile(child); + } + + private void registerJoinEvent(final FlowFile child, final Collection<FlowFile> parents) { + final ProvenanceEventRecord eventRecord = provenanceReporter.generateJoinEvent(parents, child); + List<ProvenanceEventRecord> existingRecords = generatedProvenanceEvents.get(child); + if (existingRecords == null) { + existingRecords = new ArrayList<>(); + generatedProvenanceEvents.put(child, existingRecords); + } + existingRecords.add(eventRecord); + } + + @Override + public FlowFile penalize(final FlowFile flowFile) { + validateRecordState(flowFile); + final StandardRepositoryRecord record = records.get(flowFile); + final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build(); + record.setWorking(newFile); + return newFile; + } + + @Override + public FlowFile putAttribute(final FlowFile flowFile, final String key, final String value) { + validateRecordState(flowFile); + + if (CoreAttributes.UUID.key().equals(key)) { + return flowFile; + } + + final StandardRepositoryRecord record = records.get(flowFile); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build(); + record.setWorking(newFile, key, value); + + return newFile; + } + + @Override + public FlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attributes) { + validateRecordState(flowFile); + final StandardRepositoryRecord record = records.get(flowFile); + + final String originalUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(attributes); + // Ignore the uuid attribute, if passed in + ffBuilder.addAttribute(CoreAttributes.UUID.key(), originalUuid); + final FlowFileRecord newFile = ffBuilder.build(); + + record.setWorking(newFile, attributes); + return newFile; + } + + @Override + public FlowFile removeAttribute(final FlowFile flowFile, final String key) { + validateRecordState(flowFile); + + if (CoreAttributes.UUID.key().equals(key)) { + return flowFile; + } + + final StandardRepositoryRecord record = records.get(flowFile); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build(); + record.setWorking(newFile, key, null); + return newFile; + } + + @Override + public FlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> keys) { + validateRecordState(flowFile); + + if (keys == null) { + return flowFile; + } + + final Set<String> keysToRemove; + if (keys.contains(CoreAttributes.UUID.key())) { + keysToRemove = new HashSet<>(keys); + keysToRemove.remove(CoreAttributes.UUID.key()); + } else { + keysToRemove = keys; + } + + final StandardRepositoryRecord record = records.get(flowFile); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keys).build(); + + final Map<String, String> updatedAttrs = new HashMap<>(); + for (final String key : keys) { + updatedAttrs.put(key, null); + } + + record.setWorking(newFile, updatedAttrs); + return newFile; + } + + @Override + public FlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) { + validateRecordState(flowFile); + final StandardRepositoryRecord record = records.get(flowFile); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build(); + + if (keyPattern == null) { + record.setWorking(newFile); + } else { + final Map<String, String> curAttrs = record.getCurrent().getAttributes(); + + final Map<String, String> removed = new HashMap<>(); + for (final String key : curAttrs.keySet()) { + if (CoreAttributes.UUID.key().equals(key)) { + continue; + } + + if (keyPattern.matcher(key).matches()) { + removed.put(key, null); + } + } + + record.setWorking(newFile, removed); + } + + return newFile; + } + + @Override + public void transfer(final FlowFile flowFile, final Relationship relationship) { + validateRecordState(flowFile); + final StandardRepositoryRecord record = records.get(flowFile); + record.setTransferRelationship(relationship); + final int numDestinations = context.getConnections(relationship).size(); + final int multiplier = Math.max(1, numDestinations); + + boolean autoTerminated = false; + boolean selfRelationship = false; + if (numDestinations == 0 && context.getConnectable().isAutoTerminated(relationship)) { + // auto terminated. + autoTerminated = true; + } else if (numDestinations == 0 && relationship == Relationship.SELF) { + selfRelationship = true; + } + + if (autoTerminated) { + removedCount += multiplier; + removedBytes += flowFile.getSize(); + } else if (!selfRelationship) { + flowFilesOut += multiplier; + contentSizeOut += flowFile.getSize() * multiplier; + } + } + + @Override + public void transfer(final FlowFile flowFile) { + validateRecordState(flowFile); + final StandardRepositoryRecord record = records.get(flowFile); + if (record.getOriginalQueue() == null) { + throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self"); + } + record.setTransferRelationship(Relationship.SELF); + } + + @Override + public void transfer(final Collection<FlowFile> flowFiles) { + for (final FlowFile flowFile : flowFiles) { + transfer(flowFile); + } + } + + @Override + public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) { + validateRecordState(flowFiles); + + boolean autoTerminated = false; + boolean selfRelationship = false; + final int numDestinations = context.getConnections(relationship).size(); + if (numDestinations == 0 && context.getConnectable().isAutoTerminated(relationship)) { + // auto terminated. + autoTerminated = true; + } else if (numDestinations == 0 && relationship == Relationship.SELF) { + selfRelationship = true; + } + + final int multiplier = Math.max(1, numDestinations); + + long contentSize = 0L; + for (final FlowFile flowFile : flowFiles) { + final StandardRepositoryRecord record = records.get(flowFile); + record.setTransferRelationship(relationship); + contentSize += flowFile.getSize() * multiplier; + } + + if (autoTerminated) { + removedCount += multiplier * flowFiles.size(); + removedBytes += contentSize; + } else if (!selfRelationship) { + flowFilesOut += multiplier * flowFiles.size(); + contentSizeOut += multiplier * contentSize; + } + } + + @Override + public void remove(final FlowFile flowFile) { + validateRecordState(flowFile); + final StandardRepositoryRecord record = records.get(flowFile); + record.markForDelete(); + removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key())); + + // if original connection is null, the FlowFile was created in this session, so we + // do not want to count it toward the removed count. + if (record.getOriginalQueue() == null) { + // if we've generated any Fork events, remove them because the FlowFile was created + // and then removed in this session. + generatedProvenanceEvents.remove(flowFile); + removeForkEvents(flowFile); + } else { + removedCount++; + removedBytes += flowFile.getSize(); + provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key())); + } + } + + @Override + public void remove(final Collection<FlowFile> flowFiles) { + validateRecordState(flowFiles); + for (final FlowFile flowFile : flowFiles) { + final StandardRepositoryRecord record = records.get(flowFile); + record.markForDelete(); + removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key())); + + // if original connection is null, the FlowFile was created in this session, so we + // do not want to count it toward the removed count. + if (record.getOriginalQueue() == null) { + generatedProvenanceEvents.remove(flowFile); + removeForkEvents(flowFile); + } else { + removedCount++; + removedBytes += flowFile.getSize(); + provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key())); + } + } + } + + private void removeForkEvents(final FlowFile flowFile) { + for (final ProvenanceEventBuilder builder : forkEventBuilders.values()) { + final ProvenanceEventRecord event = builder.build(); + + if (event.getEventType() == ProvenanceEventType.FORK) { + builder.removeChildFlowFile(flowFile); + } + } + } + + public void expireFlowFiles() { + final Set<FlowFileRecord> expired = new HashSet<>(); + final FlowFileFilter filter = new FlowFileFilter() { + @Override + public FlowFileFilterResult filter(final FlowFile flowFile) { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + }; + + for (final Connection conn : context.getConnectable().getIncomingConnections()) { + do { + expired.clear(); + conn.getFlowFileQueue().poll(filter, expired); + removeExpired(expired, conn); + } while (!expired.isEmpty()); + } + } + + private void removeExpired(final Set<FlowFileRecord> flowFiles, final Connection connection) { + if (flowFiles.isEmpty()) { + return; + } + + LOG.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, flowFiles.size()}); + final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size()); + + final String processorType; + final Connectable connectable = context.getConnectable(); + if (connectable instanceof ProcessorNode) { + final ProcessorNode procNode = (ProcessorNode) connectable; + processorType = procNode.getProcessor().getClass().getSimpleName(); + } else { + processorType = connectable.getClass().getSimpleName(); + } + + final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(connectable.getIdentifier(), + processorType, context.getProvenanceRepository(), this); + + final Map<String, FlowFileRecord> recordIdMap = new HashMap<>(); + for (final FlowFileRecord flowFile : flowFiles) { + recordIdMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile); + + final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile); + record.markForDelete(); + expiredRecords.add(record); + expiredReporter.expire(flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration()); + removeContent(flowFile.getContentClaim()); + + final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); + final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable; + LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); + } + + try { + final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() { + @Override + public Iterator<ProvenanceEventRecord> iterator() { + final Iterator<ProvenanceEventRecord> expiredEventIterator = expiredReporter.getEvents().iterator(); + final Iterator<ProvenanceEventRecord> enrichingIterator = new Iterator<ProvenanceEventRecord>() { + @Override + public boolean hasNext() { + return expiredEventIterator.hasNext(); + } + + @Override + public ProvenanceEventRecord next() { + final ProvenanceEventRecord event = expiredEventIterator.next(); + final StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event); + final FlowFileRecord record = recordIdMap.get(event.getFlowFileUuid()); + if (record == null) { + return null; + } + + final ContentClaim claim = record.getContentClaim(); + if (claim != null) { + enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize()); + enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize()); + } + + enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap()); + return enriched.build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + + return enrichingIterator; + } + }; + + context.getProvenanceRepository().registerEvents(iterable); + context.getFlowFileRepository().updateRepository(expiredRecords); + } catch (final IOException e) { + LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e); + } + + } + + private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset) throws ContentNotFoundException { + // If there's no content, don't bother going to the Content Repository because it is generally expensive and we know + // that there is no actual content. + if (flowFile.getSize() == 0L) { + return new ByteArrayInputStream(new byte[0]); + } + + try { + // If the recursion set is empty, we can use the same input stream that we already have open. However, if + // the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the + // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1. + if (recursionSet.isEmpty()) { + if (currentReadClaim == claim) { + if (currentReadClaimStream != null && currentReadClaimStream.getStreamLocation() <= offset) { + final long bytesToSkip = offset - currentReadClaimStream.getStreamLocation(); + if (bytesToSkip > 0) { + StreamUtils.skip(currentReadClaimStream, bytesToSkip); + } + + return new NonCloseableInputStream(currentReadClaimStream); + } + } + + final InputStream rawInStream = context.getContentRepository().read(claim); + + if (currentReadClaimStream != null) { + currentReadClaimStream.close(); + } + + currentReadClaim = claim; + currentReadClaimStream = new ByteCountingInputStream(rawInStream, new LongHolder(0L)); + StreamUtils.skip(currentReadClaimStream, offset); + + // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can + // reuse the same InputStream for the next FlowFile + return new NonCloseableInputStream(currentReadClaimStream); + } else { + final InputStream rawInStream = context.getContentRepository().read(claim); + StreamUtils.skip(rawInStream, offset); + return rawInStream; + } + } catch (final ContentNotFoundException cnfe) { + throw cnfe; + } catch (final EOFException eof) { + throw new ContentNotFoundException(claim, eof); + } catch (final IOException ioe) { + throw new FlowFileAccessException("Failed to read content of " + flowFile, ioe); + } + } + + @Override + public void read(final FlowFile source, final InputStreamCallback reader) { + validateRecordState(source); + final StandardRepositoryRecord record = records.get(source); + + try { + ensureNotAppending(record.getCurrentClaim()); + } catch (final IOException e) { + throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); + } + + try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead)) { + + // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from + // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository + // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any + // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it + // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. + final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); + boolean cnfeThrown = false; + + try { + recursionSet.add(source); + reader.process(ffais); + } catch (final ContentNotFoundException cnfe) { + cnfeThrown = true; + throw cnfe; + } finally { + recursionSet.remove(source); + + // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. + if (!cnfeThrown && ffais.getContentNotFoundException() != null) { + throw ffais.getContentNotFoundException(); + } + } + } catch (final ContentNotFoundException nfe) { + handleContentNotFound(nfe, record); + } catch (final IOException ex) { + throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ex.toString(), ex); + } + } + + @Override + public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) { + return merge(sources, destination, null, null, null); + } + + @Override + public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) { + validateRecordState(sources); + validateRecordState(destination); + if (sources.contains(destination)) { + throw new IllegalArgumentException("Destination cannot be within sources"); + } + + final Collection<StandardRepositoryRecord> sourceRecords = new ArrayList<>(); + for (final FlowFile source : sources) { + final StandardRepositoryRecord record = records.get(source); + sourceRecords.add(record); + + try { + ensureNotAppending(record.getCurrentClaim()); + } catch (final IOException e) { + throw new FlowFileAccessException("Unable to read from source " + source + " due to " + e.toString(), e); + } + } + + final StandardRepositoryRecord destinationRecord = records.get(destination); + final ContentRepository contentRepo = context.getContentRepository(); + final ContentClaim newClaim; + try { + newClaim = contentRepo.create(context.getConnectable().isLossTolerant()); + claimLog.debug("Creating ContentClaim {} for 'merge' for {}", newClaim, desti
<TRUNCATED>