Repository: nifi
Updated Branches:
  refs/heads/master c87d79193 -> c425bd288


http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3496795..4354dc4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -68,6 +68,7 @@ import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -98,6 +99,11 @@ import java.util.stream.Collectors;
  * <p/>
  */
 public final class StandardProcessSession implements ProcessSession, 
ProvenanceEventEnricher {
+    private static final int SOURCE_EVENT_BIT_INDEXES = (1 << 
ProvenanceEventType.CREATE.ordinal())
+        | (1 << ProvenanceEventType.FORK.ordinal())
+        | (1 << ProvenanceEventType.JOIN.ordinal())
+        | (1 << ProvenanceEventType.RECEIVE.ordinal())
+        | (1 << ProvenanceEventType.FETCH.ordinal());
 
     private static final AtomicLong idGenerator = new AtomicLong(0L);
     private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
@@ -110,7 +116,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private static final Logger claimLog = 
LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + 
".claims");
     private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5;
 
-    private final Map<FlowFileRecord, StandardRepositoryRecord> records = new 
ConcurrentHashMap<>();
+    private final Map<Long, StandardRepositoryRecord> records = new 
ConcurrentHashMap<>();
     private final Map<String, StandardFlowFileEvent> connectionCounts = new 
ConcurrentHashMap<>();
     private final Map<FlowFileQueue, Set<FlowFileRecord>> 
unacknowledgedFlowFiles = new ConcurrentHashMap<>();
     private final Map<ContentClaim, ByteCountingOutputStream> 
appendableStreams = new ConcurrentHashMap<>();
@@ -253,7 +259,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         List<ProvenanceEventRecord> autoTerminatedEvents = null;
 
         // validate that all records have a transfer relationship for them and 
if so determine the destination node and clone as necessary
-        final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new 
HashMap<>();
+        final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>();
         for (final StandardRepositoryRecord record : records.values()) {
             if (record.isMarkedForDelete()) {
                 continue;
@@ -317,7 +323,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                     newRecord.setDestination(destination.getFlowFileQueue());
                     
newRecord.setTransferRelationship(record.getTransferRelationship());
                     // put the mapping into toAdd because adding to records 
now will cause a ConcurrentModificationException
-                    toAdd.put(clone, newRecord);
+                    toAdd.put(clone.getId(), newRecord);
                 }
             }
         }
@@ -365,10 +371,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
              * points to the Original Claim -- which has already been removed!
              *
              */
-            for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> 
entry : checkpoint.records.entrySet()) {
-                final FlowFile flowFile = entry.getKey();
-                final StandardRepositoryRecord record = entry.getValue();
-
+            for (final StandardRepositoryRecord record : 
checkpoint.records.values()) {
                 if (record.isMarkedForDelete()) {
                     // if the working claim is not the same as the original 
claim, we can immediately destroy the working claim
                     // because it was created in this session and is to be 
deleted. We don't need to wait for the FlowFile Repo to sync.
@@ -380,10 +383,14 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                         // an issue if we only updated the FlowFile attributes.
                         decrementClaimCount(record.getOriginalClaim());
                     }
-                    final long flowFileLife = System.currentTimeMillis() - 
flowFile.getEntryDate();
-                    final Connectable connectable = context.getConnectable();
-                    final Object terminator = connectable instanceof 
ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
-                    LOG.info("{} terminated by {}; life of FlowFile = {} ms", 
new Object[] {flowFile, terminator, flowFileLife});
+
+                    if (LOG.isInfoEnabled()) {
+                        final FlowFileRecord flowFile = record.getCurrent();
+                        final long flowFileLife = System.currentTimeMillis() - 
flowFile.getEntryDate();
+                        final Connectable connectable = 
context.getConnectable();
+                        final Object terminator = connectable instanceof 
ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
+                        LOG.info("{} terminated by {}; life of FlowFile = {} 
ms", new Object[]{flowFile, terminator, flowFileLife});
+                    }
                 } else if (record.isWorking() && record.getWorkingClaim() != 
record.getOriginalClaim()) {
                     // records which have been updated - remove original if 
exists
                     decrementClaimCount(record.getOriginalClaim());
@@ -544,10 +551,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             flowFileEvent.setFlowFilesSent(flowFilesSent);
             flowFileEvent.setBytesSent(bytesSent);
 
+            final long now = System.currentTimeMillis();
             long lineageMillis = 0L;
-            for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> 
entry : checkpoint.records.entrySet()) {
-                final FlowFile flowFile = entry.getKey();
-                final long lineageDuration = System.currentTimeMillis() - 
flowFile.getLineageStartDate();
+            for (final StandardRepositoryRecord record : 
checkpoint.records.values()) {
+                final FlowFile flowFile = record.getCurrent();
+                final long lineageDuration = now - 
flowFile.getLineageStartDate();
                 lineageMillis += lineageDuration;
             }
             flowFileEvent.setAggregateLineageMillis(lineageMillis);
@@ -566,13 +574,16 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private Map<String, Long> combineCounters(final Map<String, Long> first, 
final Map<String, Long> second) {
-        if (first == null && second == null) {
+        final boolean firstEmpty = first == null || first.isEmpty();
+        final boolean secondEmpty = second == null || second.isEmpty();
+
+        if (firstEmpty && secondEmpty) {
             return null;
         }
-        if (first == null) {
+        if (firstEmpty) {
             return second;
         }
-        if (second == null) {
+        if (secondEmpty) {
             return first;
         }
 
@@ -582,14 +593,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         return combined;
     }
 
-    private void addEventType(final Map<String, Set<ProvenanceEventType>> map, 
final String id, final ProvenanceEventType eventType) {
-        Set<ProvenanceEventType> eventTypes = map.get(id);
-        if (eventTypes == null) {
-            eventTypes = new HashSet<>();
-            map.put(id, eventTypes);
-        }
+    private void addEventType(final Map<String, BitSet> map, final String id, 
final ProvenanceEventType eventType) {
+        final BitSet eventTypes = map.computeIfAbsent(id, key -> new BitSet());
+        eventTypes.set(eventType.ordinal());
+    }
 
-        eventTypes.add(eventType);
+    private StandardRepositoryRecord getRecord(final FlowFile flowFile) {
+        return records.get(flowFile.getId());
     }
 
     private void updateProvenanceRepo(final Checkpoint checkpoint) {
@@ -600,7 +610,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         // in case the Processor developer submitted the same events to the 
reporter. So we use a LinkedHashSet
         // for this, so that we are able to ensure that the events are 
submitted in the proper order.
         final Set<ProvenanceEventRecord> recordsToSubmit = new 
LinkedHashSet<>();
-        final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = 
new HashMap<>();
+        final Map<String, BitSet> eventTypesPerFlowFileId = new HashMap<>();
 
         final Set<ProvenanceEventRecord> processorGenerated = 
checkpoint.reportedEvents;
 
@@ -613,7 +623,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             final ProvenanceEventBuilder builder = entry.getValue();
             final FlowFile flowFile = entry.getKey();
 
-            updateEventContentClaims(builder, flowFile, 
checkpoint.records.get(flowFile));
+            updateEventContentClaims(builder, flowFile, 
checkpoint.getRecord(flowFile));
             final ProvenanceEventRecord event = builder.build();
 
             if (!event.getChildUuids().isEmpty() && 
!isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
@@ -692,14 +702,15 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
 
             if (checkpoint.createdFlowFiles.contains(flowFileId)) {
-                final Set<ProvenanceEventType> registeredTypes = 
eventTypesPerFlowFileId.get(flowFileId);
+                final BitSet registeredTypes = 
eventTypesPerFlowFileId.get(flowFileId);
                 boolean creationEventRegistered = false;
                 if (registeredTypes != null) {
-                    if (registeredTypes.contains(ProvenanceEventType.CREATE)
-                        || registeredTypes.contains(ProvenanceEventType.FORK)
-                        || registeredTypes.contains(ProvenanceEventType.JOIN)
-                        || 
registeredTypes.contains(ProvenanceEventType.RECEIVE)
-                        || 
registeredTypes.contains(ProvenanceEventType.FETCH)) {
+                    if 
(registeredTypes.get(ProvenanceEventType.CREATE.ordinal())
+                        || 
registeredTypes.get(ProvenanceEventType.FORK.ordinal())
+                        || 
registeredTypes.get(ProvenanceEventType.JOIN.ordinal())
+                        || 
registeredTypes.get(ProvenanceEventType.RECEIVE.ordinal())
+                        || 
registeredTypes.get(ProvenanceEventType.FETCH.ordinal())) {
+
                         creationEventRegistered = true;
                     }
                 }
@@ -802,7 +813,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord 
rawEvent, final FlowFile flowFile, final long commitNanos) {
         verifyTaskActive();
 
-        final StandardRepositoryRecord repoRecord = records.get(flowFile);
+        final StandardRepositoryRecord repoRecord = getRecord(flowFile);
         if (repoRecord == null) {
             throw new FlowFileHandlingException(flowFile + " is not known in 
this session (" + toString() + ")");
         }
@@ -839,12 +850,12 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private StandardProvenanceEventRecord enrich(
-        final ProvenanceEventRecord rawEvent, final Map<String, 
FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, 
StandardRepositoryRecord> records,
+        final ProvenanceEventRecord rawEvent, final Map<String, 
FlowFileRecord> flowFileRecordMap, final Map<Long, StandardRepositoryRecord> 
records,
         final boolean updateAttributes, final long commitNanos) {
         final StandardProvenanceEventRecord.Builder recordBuilder = new 
StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
         final FlowFileRecord eventFlowFile = 
flowFileRecordMap.get(rawEvent.getFlowFileUuid());
         if (eventFlowFile != null) {
-            final StandardRepositoryRecord repoRecord = 
records.get(eventFlowFile);
+            final StandardRepositoryRecord repoRecord = 
records.get(eventFlowFile.getId());
 
             if (repoRecord.getCurrent() != null && 
repoRecord.getCurrentClaim() != null) {
                 final ContentClaim currentClaim = repoRecord.getCurrentClaim();
@@ -910,7 +921,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
      * @param records records
      * @return true if spurious route
      */
-    private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, 
final Map<FlowFileRecord, StandardRepositoryRecord> records) {
+    private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, 
final Map<Long, StandardRepositoryRecord> records) {
         if (event.getEventType() == ProvenanceEventType.ROUTE) {
             final String relationshipName = event.getRelationship();
             final Relationship relationship = new 
Relationship.Builder().name(relationshipName).build();
@@ -919,10 +930,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             // If the number of connections for this relationship is not 1, 
then we can't ignore this ROUTE event,
             // as it may be cloning the FlowFile and adding to multiple 
connections.
             if (connectionsForRelationship.size() == 1) {
-                for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> 
entry : records.entrySet()) {
-                    final FlowFileRecord flowFileRecord = entry.getKey();
+                for (final StandardRepositoryRecord repoRecord : 
records.values()) {
+                    final FlowFileRecord flowFileRecord = 
repoRecord.getCurrent();
                     if 
(event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())))
 {
-                        final StandardRepositoryRecord repoRecord = 
entry.getValue();
                         if (repoRecord.getOriginalQueue() == null) {
                             return false;
                         }
@@ -1077,35 +1087,35 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final StringBuilder details = new StringBuilder(1024).append("[");
         final int initLen = details.length();
         int filesListed = 0;
-        for (Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : 
records.entrySet()) {
+        for (StandardRepositoryRecord repoRecord : records.values()) {
             if (filesListed >= MAX_ROLLBACK_FLOWFILES_TO_LOG) {
                 break;
             }
             filesListed++;
-            final FlowFileRecord entryKey = entry.getKey();
-            final StandardRepositoryRecord entryValue = entry.getValue();
+
             if (details.length() > initLen) {
                 details.append(", ");
             }
-            if (entryValue.getOriginalQueue() != null && 
entryValue.getOriginalQueue().getIdentifier() != null) {
+            if (repoRecord.getOriginalQueue() != null && 
repoRecord.getOriginalQueue().getIdentifier() != null) {
                 details.append("queue=")
-                        .append(entryValue.getOriginalQueue().getIdentifier())
+                        .append(repoRecord.getOriginalQueue().getIdentifier())
                         .append("/");
             }
             details.append("filename=")
-                    
.append(entryKey.getAttribute(CoreAttributes.FILENAME.key()))
+                    
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.FILENAME.key()))
                     .append("/uuid=")
-                    .append(entryKey.getAttribute(CoreAttributes.UUID.key()));
+                    
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.UUID.key()));
         }
-        if (records.entrySet().size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) {
+        if (records.size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) {
             if (details.length() > initLen) {
                 details.append(", ");
             }
-            details.append(records.entrySet().size() - 
MAX_ROLLBACK_FLOWFILES_TO_LOG)
+            details.append(records.size() - MAX_ROLLBACK_FLOWFILES_TO_LOG)
                     .append(" additional Flowfiles not listed");
         } else if (filesListed == 0) {
             details.append("none");
         }
+
         details.append("]");
         return details.toString();
     }
@@ -1216,7 +1226,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 throw new IllegalStateException(flowFile + " already in use 
for an active callback or OutputStream created by 
ProcessSession.write(FlowFile) has not been closed");
             }
 
-            final StandardRepositoryRecord record = records.get(flowFile);
+            final StandardRepositoryRecord record = getRecord(flowFile);
             if (record == null) {
                 throw new FlowFileHandlingException(flowFile + " is not known 
in this session (" + toString() + ")");
             }
@@ -1275,8 +1285,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         for (final FlowFile flowFile : flowFiles) {
             final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
 
-            final StandardRepositoryRecord repoRecord = 
this.records.remove(flowFile);
-            newOwner.records.put(flowFileRecord, repoRecord);
+            final StandardRepositoryRecord repoRecord = 
this.records.remove(flowFile.getId());
+            newOwner.records.put(flowFileRecord.getId(), repoRecord);
 
             // Adjust the counts for Connections for each FlowFile that was 
pulled from a Connection.
             // We do not have to worry about accounting for 'input counts' on 
connections because those
@@ -1348,9 +1358,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final Set<String> modifiedFlowFileIds = new HashSet<>();
         int largestTransferSetSize = 0;
 
-        for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : 
checkpoint.records.entrySet()) {
-            final FlowFile flowFile = entry.getKey();
+        for (final Map.Entry<Long, StandardRepositoryRecord> entry : 
checkpoint.records.entrySet()) {
             final StandardRepositoryRecord record = entry.getValue();
+            final FlowFile flowFile = record.getCurrent();
 
             final Relationship relationship = record.getTransferRelationship();
             if (Relationship.SELF.equals(relationship)) {
@@ -1479,7 +1489,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     private void registerDequeuedRecord(final FlowFileRecord flowFile, final 
Connection connection) {
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
-        records.put(flowFile, record);
+        records.put(flowFile.getId(), record);
         flowFilesIn++;
         contentSizeIn += flowFile.getSize();
 
@@ -1655,16 +1665,17 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         final Map<String, String> attrs = new HashMap<>();
-        attrs.put(CoreAttributes.FILENAME.key(), 
String.valueOf(System.nanoTime()));
+        final String uuid = UUID.randomUUID().toString();
+        attrs.put(CoreAttributes.FILENAME.key(), uuid);
         attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
-        attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+        attrs.put(CoreAttributes.UUID.key(), uuid);
 
         final FlowFileRecord fFile = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
             .addAttributes(attrs)
             .build();
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
         record.setWorking(fFile, attrs);
-        records.put(fFile, record);
+        records.put(fFile.getId(), record);
         createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
         return fFile;
     }
@@ -1681,7 +1692,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         example = validateRecordState(example);
-        final StandardRepositoryRecord exampleRepoRecord = 
records.get(example);
+        final StandardRepositoryRecord exampleRepoRecord = getRecord(example);
         final FlowFileRecord currRec = exampleRepoRecord.getCurrent();
         final ContentClaim claim = exampleRepoRecord.getCurrentClaim();
         if (offset + size > example.getSize()) {
@@ -1702,7 +1713,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
         record.setWorking(clone, clone.getAttributes());
-        records.put(clone, record);
+        records.put(clone.getId(), record);
 
         if (offset == 0L && size == example.getSize()) {
             provenanceReporter.clone(example, clone);
@@ -1730,7 +1741,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             eventBuilder.setComponentType(processorType);
             eventBuilder.addParentFlowFile(parent);
 
-            updateEventContentClaims(eventBuilder, parent, 
records.get(parent));
+            updateEventContentClaims(eventBuilder, parent, getRecord(parent));
             forkEventBuilders.put(parent, eventBuilder);
         }
 
@@ -1752,7 +1763,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         flowFile = validateRecordState(flowFile);
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         final long expirationEpochMillis = System.currentTimeMillis() + 
context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
         record.setWorking(newFile);
@@ -1768,7 +1779,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             return flowFile;
         }
 
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key,
 value).build();
         record.setWorking(newFile, key, value);
 
@@ -1780,7 +1791,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         flowFile = validateRecordState(flowFile);
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
 
         final Map<String, String> updatedAttributes;
         if (attributes.containsKey(CoreAttributes.UUID.key())) {
@@ -1794,6 +1805,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final FlowFileRecord newFile = ffBuilder.build();
 
         record.setWorking(newFile, updatedAttributes);
+
         return newFile;
     }
 
@@ -1806,7 +1818,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             return flowFile;
         }
 
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build();
         record.setWorking(newFile, key, null);
         return newFile;
@@ -1821,7 +1833,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             return flowFile;
         }
 
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keys).build();
 
         final Map<String, String> updatedAttrs = new HashMap<>();
@@ -1842,7 +1854,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         flowFile = validateRecordState(flowFile);
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
 
         if (keyPattern == null) {
@@ -1895,7 +1907,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             // the relationship specified is not known in this session/context
             throw new IllegalArgumentException("Relationship '" + 
relationship.getName() + "' is not known");
         }
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         record.setTransferRelationship(relationship);
         updateLastQueuedDate(record);
 
@@ -1913,7 +1925,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         flowFile = validateRecordState(flowFile);
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         if (record.getOriginalQueue() == null) {
             throw new IllegalArgumentException("Cannot transfer FlowFiles that 
are created in this Session back to self");
         }
@@ -1951,7 +1963,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final long queuedTime = System.currentTimeMillis();
         long contentSize = 0L;
         for (final FlowFile flowFile : flowFiles) {
-            final StandardRepositoryRecord record = records.get(flowFile);
+            final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
+            final StandardRepositoryRecord record = getRecord(flowFileRecord);
             record.setTransferRelationship(relationship);
             updateLastQueuedDate(record, queuedTime);
 
@@ -1972,7 +1985,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         flowFile = validateRecordState(flowFile);
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         record.markForDelete();
         removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
 
@@ -1996,7 +2009,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         flowFiles = validateRecordState(flowFiles);
         for (final FlowFile flowFile : flowFiles) {
-            final StandardRepositoryRecord record = records.get(flowFile);
+            final StandardRepositoryRecord record = getRecord(flowFile);
             record.markForDelete();
             
removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
 
@@ -2195,7 +2208,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         source = validateRecordState(source, true);
-        final StandardRepositoryRecord record = records.get(source);
+        final StandardRepositoryRecord record = getRecord(source);
 
         try {
             ensureNotAppending(record.getCurrentClaim());
@@ -2251,7 +2264,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         source = validateRecordState(source, true);
-        final StandardRepositoryRecord record = records.get(source);
+        final StandardRepositoryRecord record = getRecord(source);
 
         try {
             ensureNotAppending(record.getCurrentClaim());
@@ -2400,7 +2413,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         final Collection<StandardRepositoryRecord> sourceRecords = new 
ArrayList<>();
         for (final FlowFile source : sources) {
-            final StandardRepositoryRecord record = records.get(source);
+            final StandardRepositoryRecord record = getRecord(source);
             sourceRecords.add(record);
 
             try {
@@ -2411,7 +2424,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
         }
 
-        final StandardRepositoryRecord destinationRecord = 
records.get(destination);
+        final StandardRepositoryRecord destinationRecord = 
getRecord(destination);
         final ContentRepository contentRepo = context.getContentRepository();
         final ContentClaim newClaim;
         try {
@@ -2437,7 +2450,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 final boolean useDemarcator = demarcator != null && 
demarcator.length > 0;
                 final int numSources = sources.size();
                 for (final FlowFile source : sources) {
-                    final StandardRepositoryRecord sourceRecord = 
records.get(source);
+                    final StandardRepositoryRecord sourceRecord = 
getRecord(source);
 
                     final long copied = 
contentRepo.exportTo(sourceRecord.getCurrentClaim(), out, 
sourceRecord.getCurrentClaimOffset(), source.getSize());
                     writtenCount += copied;
@@ -2473,7 +2486,6 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         removeTemporaryClaim(destinationRecord);
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(destinationRecord.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(writtenCount).build();
         destinationRecord.setWorking(newFile);
-        records.put(newFile, destinationRecord);
         return newFile;
     }
 
@@ -2495,7 +2507,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public OutputStream write(FlowFile source) {
         verifyTaskActive();
         source = validateRecordState(source);
-        final StandardRepositoryRecord record = records.get(source);
+        final StandardRepositoryRecord record = getRecord(source);
 
         ContentClaim newClaim = null;
         try {
@@ -2618,7 +2630,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public FlowFile write(FlowFile source, final OutputStreamCallback writer) {
         verifyTaskActive();
         source = validateRecordState(source);
-        final StandardRepositoryRecord record = records.get(source);
+        final StandardRepositoryRecord record = getRecord(source);
 
         long writtenToFlowFile = 0L;
         ContentClaim newClaim = null;
@@ -2677,7 +2689,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         source = validateRecordState(source);
-        final StandardRepositoryRecord record = records.get(source);
+        final StandardRepositoryRecord record = getRecord(source);
         long newSize = 0L;
 
         // Get the current Content Claim from the record and see if we already 
have
@@ -2858,7 +2870,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public FlowFile write(FlowFile source, final StreamCallback writer) {
         verifyTaskActive();
         source = validateRecordState(source);
-        final StandardRepositoryRecord record = records.get(source);
+        final StandardRepositoryRecord record = getRecord(source);
         final ContentClaim currClaim = record.getCurrentClaim();
 
         long writtenToFlowFile = 0L;
@@ -2932,6 +2944,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             .build();
 
         record.setWorking(newFile);
+
         return newFile;
     }
 
@@ -2946,7 +2959,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             throw new FlowFileAccessException("Cannot write to path " + 
source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will 
not import.");
         }
 
-        final StandardRepositoryRecord record = records.get(destination);
+        final StandardRepositoryRecord record = getRecord(destination);
 
         final ContentClaim newClaim;
         final long claimOffset;
@@ -2992,7 +3005,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
 
         destination = validateRecordState(destination);
-        final StandardRepositoryRecord record = records.get(destination);
+        final StandardRepositoryRecord record = getRecord(destination);
         ContentClaim newClaim = null;
         final long claimOffset = 0L;
 
@@ -3030,7 +3043,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public void exportTo(FlowFile source, final Path destination, final 
boolean append) {
         verifyTaskActive();
         source = validateRecordState(source);
-        final StandardRepositoryRecord record = records.get(source);
+        final StandardRepositoryRecord record = getRecord(source);
         try {
             ensureNotAppending(record.getCurrentClaim());
 
@@ -3049,7 +3062,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public void exportTo(FlowFile source, final OutputStream destination) {
         verifyTaskActive();
         source = validateRecordState(source);
-        final StandardRepositoryRecord record = records.get(source);
+        final StandardRepositoryRecord record = getRecord(source);
 
         if(record.getCurrentClaim() == null) {
             return;
@@ -3137,7 +3150,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             throw new IllegalStateException(flowFile + " already in use for an 
active callback or an OutputStream created by ProcessSession.write(FlowFile) 
has not been closed");
         }
 
-        final StandardRepositoryRecord record = records.get(flowFile);
+        final StandardRepositoryRecord record = getRecord(flowFile);
         if (record == null) {
             rollback();
             throw new FlowFileHandlingException(flowFile + " is not known in 
this session (" + toString() + ")");
@@ -3170,11 +3183,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
      *         <code>false</code> otherwise.
      */
     boolean isFlowFileKnown(final FlowFile flowFile) {
-        return records.containsKey(flowFile);
+        return records.containsKey(flowFile.getId());
     }
 
     private FlowFile getMostRecent(final FlowFile flowFile) {
-        final StandardRepositoryRecord existingRecord = records.get(flowFile);
+        final StandardRepositoryRecord existingRecord = getRecord(flowFile);
         return existingRecord == null ? flowFile : existingRecord.getCurrent();
     }
 
@@ -3183,10 +3196,12 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         verifyTaskActive();
         parent = getMostRecent(parent);
 
+        final String uuid = UUID.randomUUID().toString();
+
         final Map<String, String> newAttributes = new HashMap<>(3);
-        newAttributes.put(CoreAttributes.FILENAME.key(), 
String.valueOf(System.nanoTime()));
+        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
         newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
-        newAttributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
+        newAttributes.put(CoreAttributes.UUID.key(), uuid);
 
         final StandardFlowFileRecord.Builder fFileBuilder = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
 
@@ -3210,7 +3225,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final FlowFileRecord fFile = fFileBuilder.build();
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
         record.setWorking(fFile, newAttributes);
-        records.put(fFile, record);
+        records.put(fFile.getId(), record);
         createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
 
         registerForkEvent(parent, fFile);
@@ -3247,9 +3262,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
         }
 
-        newAttributes.put(CoreAttributes.FILENAME.key(), 
String.valueOf(System.nanoTime()));
+        final String uuid = UUID.randomUUID().toString();
+        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
         newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
-        newAttributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
+        newAttributes.put(CoreAttributes.UUID.key(), uuid);
 
         final FlowFileRecord fFile = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
             .addAttributes(newAttributes)
@@ -3258,7 +3274,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
         record.setWorking(fFile, newAttributes);
-        records.put(fFile, record);
+        records.put(fFile.getId(), record);
         createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
 
         registerJoinEvent(fFile, parents);
@@ -3339,7 +3355,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         private final List<ProvenanceEventRecord> autoTerminatedEvents = new 
ArrayList<>();
         private final Set<ProvenanceEventRecord> reportedEvents = new 
LinkedHashSet<>();
 
-        private final Map<FlowFileRecord, StandardRepositoryRecord> records = 
new ConcurrentHashMap<>();
+        private final Map<Long, StandardRepositoryRecord> records = new 
ConcurrentHashMap<>();
         private final Map<String, StandardFlowFileEvent> connectionCounts = 
new ConcurrentHashMap<>();
         private final Map<FlowFileQueue, Set<FlowFileRecord>> 
unacknowledgedFlowFiles = new ConcurrentHashMap<>();
 
@@ -3392,5 +3408,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             this.contentSizeIn += session.contentSizeIn;
             this.contentSizeOut += session.contentSizeOut;
         }
+
+        private StandardRepositoryRecord getRecord(final FlowFile flowFile) {
+            return records.get(flowFile.getId());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 167fa73..299f73b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -16,14 +16,6 @@
  */
 package org.apache.nifi.processor;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.nifi.attribute.expression.language.PreparedQuery;
 import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.attribute.expression.language.Query.Range;
@@ -41,6 +33,15 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.processor.exception.TerminatedTaskException;
 import org.apache.nifi.util.Connectables;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class StandardProcessContext implements ProcessContext, 
ControllerServiceLookup {
 
     private final ProcessorNode procNode;
@@ -49,6 +50,7 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
     private final StringEncryptor encryptor;
     private final StateManager stateManager;
     private final TaskTermination taskTermination;
+    private final Map<PropertyDescriptor, String> properties;
 
     public StandardProcessContext(final ProcessorNode processorNode, final 
ControllerServiceProvider controllerServiceProvider, final StringEncryptor 
encryptor, final StateManager stateManager,
             final TaskTermination taskTermination) {
@@ -71,6 +73,8 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
                 preparedQueries.put(desc, pq);
             }
         }
+
+        properties = 
Collections.unmodifiableMap(processorNode.getProperties());
     }
 
     private void verifyTaskActive() {
@@ -82,7 +86,17 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
     @Override
     public PropertyValue getProperty(final PropertyDescriptor descriptor) {
         verifyTaskActive();
-        return getProperty(descriptor.getName());
+
+        final String setPropertyValue = properties.get(descriptor);
+        if (setPropertyValue != null) {
+            return new StandardPropertyValue(setPropertyValue, this, 
preparedQueries.get(descriptor), procNode.getVariableRegistry());
+        }
+
+        // Get the "canonical" Property Descriptor from the Processor
+        final PropertyDescriptor canonicalDescriptor = 
procNode.getProcessor().getPropertyDescriptor(descriptor.getName());
+        final String defaultValue = canonicalDescriptor.getDefaultValue();
+
+        return new StandardPropertyValue(defaultValue, this, 
preparedQueries.get(descriptor), procNode.getVariableRegistry());
     }
 
     /**
@@ -99,7 +113,7 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
             return null;
         }
 
-        final String setPropertyValue = procNode.getProperty(descriptor);
+        final String setPropertyValue = properties.get(descriptor);
         final String propValue = (setPropertyValue == null) ? 
descriptor.getDefaultValue() : setPropertyValue;
 
         return new StandardPropertyValue(propValue, this, 
preparedQueries.get(descriptor), procNode.getVariableRegistry());
@@ -138,7 +152,7 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
     @Override
     public Map<PropertyDescriptor, String> getProperties() {
         verifyTaskActive();
-        return procNode.getProperties();
+        return properties;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
index 6f045e5..c960902 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
@@ -16,30 +16,31 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.processor.Relationship;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.processor.Relationship;
-
 public class StandardRepositoryRecord implements RepositoryRecord {
 
-    private RepositoryRecordType type = null;
+    private RepositoryRecordType type;
     private FlowFileRecord workingFlowFileRecord = null;
     private Relationship transferRelationship = null;
     private FlowFileQueue destination = null;
     private final FlowFileRecord originalFlowFileRecord;
     private final FlowFileQueue originalQueue;
     private String swapLocation;
-    private final Map<String, String> updatedAttributes = new HashMap<>();
     private final Map<String, String> originalAttributes;
+    private Map<String, String> updatedAttributes = null;
     private List<ContentClaim> transientClaims;
     private final long startNanos = System.nanoTime();
 
+
     /**
      * Creates a new record which has no original claim or flow file - it is 
entirely new
      *
@@ -66,7 +67,7 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
         this.originalFlowFileRecord = originalFlowFileRecord;
         this.type = RepositoryRecordType.SWAP_OUT;
         this.swapLocation = swapLocation;
-        this.originalAttributes = originalFlowFileRecord == null ? 
Collections.<String, String>emptyMap() : originalFlowFileRecord.getAttributes();
+        this.originalAttributes = originalFlowFileRecord == null ? 
Collections.emptyMap() : originalFlowFileRecord.getAttributes();
     }
 
     @Override
@@ -113,30 +114,48 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
         workingFlowFileRecord = flowFile;
     }
 
+    private Map<String, String> initializeUpdatedAttributes() {
+        if (updatedAttributes == null) {
+            updatedAttributes = new HashMap<>();
+        }
+
+        return updatedAttributes;
+    }
+
     public void setWorking(final FlowFileRecord flowFile, final String 
attributeKey, final String attributeValue) {
         workingFlowFileRecord = flowFile;
 
+        // In the case that the type is CREATE, we know that all attributes 
are updated attributes, so no need to store them.
+        if (type == RepositoryRecordType.CREATE) {
+            return;
+        }
+
         // If setting attribute to same value as original, don't add to 
updated attributes
         final String currentValue = originalAttributes.get(attributeKey);
         if (currentValue == null || !currentValue.equals(attributeValue)) {
-            updatedAttributes.put(attributeKey, attributeValue);
+            initializeUpdatedAttributes().put(attributeKey, attributeValue);
         }
     }
 
     public void setWorking(final FlowFileRecord flowFile, final Map<String, 
String> updatedAttribs) {
         workingFlowFileRecord = flowFile;
 
+        // In the case that the type is CREATE, we know that all attributes 
are updated attributes, so no need to store them.
+        if (type == RepositoryRecordType.CREATE) {
+            return;
+        }
+
         for (final Map.Entry<String, String> entry : 
updatedAttribs.entrySet()) {
             final String currentValue = originalAttributes.get(entry.getKey());
             if (currentValue == null || 
!currentValue.equals(entry.getValue())) {
-                updatedAttributes.put(entry.getKey(), entry.getValue());
+                initializeUpdatedAttributes().put(entry.getKey(), 
entry.getValue());
             }
         }
     }
 
     @Override
     public boolean isAttributesChanged() {
-        return !updatedAttributes.isEmpty();
+        return type == RepositoryRecordType.CREATE || (updatedAttributes != 
null && !updatedAttributes.isEmpty());
     }
 
     public void markForAbort() {
@@ -196,7 +215,11 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
     }
 
     Map<String, String> getUpdatedAttributes() {
-        return updatedAttributes;
+        if (type == RepositoryRecordType.CREATE) {
+            return getCurrent().getAttributes();
+        }
+
+        return updatedAttributes == null ? Collections.emptyMap() : 
updatedAttributes;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index 5ed4d9e..d16bc4b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -16,31 +16,14 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -65,6 +48,25 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.util.TextLineDemarcator;
 import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
@@ -158,19 +160,17 @@ public class SplitText extends AbstractProcessor {
     private static final Set<Relationship> relationships;
 
     static {
-        properties = Collections.unmodifiableList(Arrays.asList(new 
PropertyDescriptor[]{
-                LINE_SPLIT_COUNT,
-                FRAGMENT_MAX_SIZE,
-                HEADER_LINE_COUNT,
-                HEADER_MARKER,
-                REMOVE_TRAILING_NEWLINES
-        }));
-
-        relationships = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(new Relationship[]{
-                REL_ORIGINAL,
-                REL_SPLITS,
-                REL_FAILURE
-        })));
+        properties = Collections.unmodifiableList(Arrays.asList(
+            LINE_SPLIT_COUNT,
+            FRAGMENT_MAX_SIZE,
+            HEADER_LINE_COUNT,
+            HEADER_MARKER,
+            REMOVE_TRAILING_NEWLINES));
+
+        relationships = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(
+            REL_ORIGINAL,
+            REL_SPLITS,
+            REL_FAILURE)));
     }
 
     private volatile boolean removeTrailingNewLines;
@@ -259,9 +259,11 @@ public class SplitText extends AbstractProcessor {
             processSession.transfer(sourceFlowFile, REL_FAILURE);
         } else {
             final String fragmentId = UUID.randomUUID().toString();
-            List<FlowFile> splitFlowFiles = 
this.generateSplitFlowFiles(fragmentId, sourceFlowFile, 
headerSplitInfoRef.get(), computedSplitsInfo, processSession);
+            final List<FlowFile> splitFlowFiles = 
this.generateSplitFlowFiles(fragmentId, sourceFlowFile, 
headerSplitInfoRef.get(), computedSplitsInfo, processSession);
+
             final FlowFile originalFlowFile = 
FragmentAttributes.copyAttributesToOriginal(processSession, sourceFlowFile, 
fragmentId, splitFlowFiles.size());
             processSession.transfer(originalFlowFile, REL_ORIGINAL);
+
             if (!splitFlowFiles.isEmpty()) {
                 processSession.transfer(splitFlowFiles, REL_SPLITS);
             }
@@ -291,6 +293,7 @@ public class SplitText extends AbstractProcessor {
      */
     private List<FlowFile> generateSplitFlowFiles(String fragmentId, FlowFile 
sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, 
ProcessSession processSession){
         List<FlowFile> splitFlowFiles = new ArrayList<>();
+
         FlowFile headerFlowFile = null;
         long headerCrlfLength = 0;
         if (splitInfo != null) {
@@ -305,7 +308,11 @@ public class SplitText extends AbstractProcessor {
                     fragmentId, fragmentIndex++, 
sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
             splitFlowFiles.add(splitFlowFile);
         } else {
-            for (SplitInfo computedSplitInfo : computedSplitsInfo) {
+            final Iterator<SplitInfo> itr = computedSplitsInfo.iterator();
+            while (itr.hasNext()) {
+                final SplitInfo computedSplitInfo = itr.next();
+                itr.remove();
+
                 long length = this.removeTrailingNewLines ? 
computedSplitInfo.trimmedLength : computedSplitInfo.length;
                 boolean proceedWithClone = headerFlowFile != null || length > 
0;
                 if (proceedWithClone) {
@@ -326,16 +333,24 @@ public class SplitText extends AbstractProcessor {
                     splitFlowFiles.add(splitFlowFile);
                 }
             }
+
             // Update fragment.count with real split count (i.e. don't count 
files for which there was no clone)
-            for (FlowFile splitFlowFile : splitFlowFiles) {
-                splitFlowFile = processSession.putAttribute(splitFlowFile, 
FRAGMENT_COUNT, String.valueOf(fragmentIndex - 1)); // -1 because the index 
starts at 1 (see above)
+            final String fragmentCount = String.valueOf(fragmentIndex - 1); // 
-1 because the index starts at 1 (see above)
+
+            final ListIterator<FlowFile> flowFileItr = 
splitFlowFiles.listIterator();
+            while (flowFileItr.hasNext()) {
+                FlowFile splitFlowFile = flowFileItr.next();
+
+                final FlowFile updated = 
processSession.putAttribute(splitFlowFile, FRAGMENT_COUNT, fragmentCount);
+                flowFileItr.set(updated);
             }
         }
 
-        getLogger().info("Split " + sourceFlowFile + " into " + 
splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing 
headers." : "."));
+        getLogger().info("Split {} into {} FlowFiles{}", new Object[] 
{sourceFlowFile, splitFlowFiles.size(), headerFlowFile == null ? " containing 
headers." : "."});
         if (headerFlowFile != null) {
             processSession.remove(headerFlowFile);
         }
+
         return splitFlowFiles;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 29e903f..ae44320 100644
--- 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -16,23 +16,6 @@
  */
 package org.apache.nifi.processors.attributes;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -75,6 +58,24 @@ import org.apache.nifi.update.attributes.FlowFilePolicy;
 import org.apache.nifi.update.attributes.Rule;
 import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
@@ -97,6 +98,13 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
     private final static Set<Relationship> statelessRelationshipSet;
     private final static Set<Relationship> statefulRelationshipSet;
 
+    private final Map<String, String> canonicalValueLookup = new 
LinkedHashMap<String, String>() {
+        @Override
+        protected boolean removeEldestEntry(final Map.Entry eldest) {
+            return size() > 100;
+        }
+    };
+
     // relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .description("All successful FlowFiles are routed to this 
relationship").name("success").build();
@@ -619,6 +627,30 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
         }
     }
 
+    /**
+     * This method caches a 'canonical' value for a given attribute value. 
When this processor is used to update an attribute or add a new
+     * attribute, if Expression Language is used, we may well end up with a 
new String object for each attribute for each FlowFile. As a result,
+     * we will store a different String object for the attribute value of 
every FlowFile, meaning that we have to keep a lot of String objects
+     * in heap. By using this 'canonical lookup', we are able to keep only a 
single String object on the heap.
+     *
+     * For example, if we have a property named "abc" and the value is 
"${abc}${xyz}", and we send through 1,000 FlowFiles with attributes abc="abc"
+     * and xyz="xyz", then would end up with 1,000 String objects with a value 
of "abcxyz". By using this canonical representation, we are able to
+     * instead hold a single String whose value is "abcxyz" instead of holding 
1,000 String objects in heap (1,000 String objects may still be created
+     * when calling PropertyValue.evaluateAttributeExpressions, but this way 
those values are garbage collected).
+     *
+     * @param attributeValue the value whose canonical value should be return
+     * @return the canonical representation of the given attribute value
+     */
+    private synchronized String getCanonicalRepresentation(final String 
attributeValue) {
+        final String canonical = this.canonicalValueLookup.get(attributeValue);
+        if (canonical != null) {
+            return canonical;
+        }
+
+        this.canonicalValueLookup.put(attributeValue, attributeValue);
+        return attributeValue;
+    }
+
     // Executes the specified action on the specified flowfile.
     private FlowFile executeActions(final ProcessSession session, final 
ProcessContext context, final List<Rule> rules, final Map<String, Action> 
defaultActions, final FlowFile flowfile,
                                     final Map<String, String> 
stateInitialAttributes, final Map<String, String> stateWorkingAttributes) {
@@ -688,7 +720,8 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
 
                 if (notDeleted || setStatefulAttribute) {
                     try {
-                        final String newAttributeValue = 
getPropertyValue(action.getValue(), 
context).evaluateAttributeExpressions(flowfile, null, null, 
stateInitialAttributes).getValue();
+                        String newAttributeValue = 
getPropertyValue(action.getValue(), 
context).evaluateAttributeExpressions(flowfile, null, null, 
stateInitialAttributes).getValue();
+                        newAttributeValue = 
getCanonicalRepresentation(newAttributeValue);
 
                         // log if appropriate
                         if (debugEnabled) {

Reply via email to