Repository: nifi Updated Branches: refs/heads/master c87d79193 -> c425bd288
http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 3496795..4354dc4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -68,6 +68,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -98,6 +99,11 @@ import java.util.stream.Collectors; * <p/> */ public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher { + private static final int SOURCE_EVENT_BIT_INDEXES = (1 << ProvenanceEventType.CREATE.ordinal()) + | (1 << ProvenanceEventType.FORK.ordinal()) + | (1 << ProvenanceEventType.JOIN.ordinal()) + | (1 << ProvenanceEventType.RECEIVE.ordinal()) + | (1 << ProvenanceEventType.FETCH.ordinal()); private static final AtomicLong idGenerator = new AtomicLong(0L); private static final AtomicLong enqueuedIndex = new AtomicLong(0L); @@ -110,7 +116,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private static final Logger claimLog = LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + ".claims"); private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5; - private final Map<FlowFileRecord, StandardRepositoryRecord> records = new ConcurrentHashMap<>(); + private final Map<Long, StandardRepositoryRecord> records = new ConcurrentHashMap<>(); private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>(); private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>(); private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new ConcurrentHashMap<>(); @@ -253,7 +259,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE List<ProvenanceEventRecord> autoTerminatedEvents = null; // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary - final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>(); + final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>(); for (final StandardRepositoryRecord record : records.values()) { if (record.isMarkedForDelete()) { continue; @@ -317,7 +323,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newRecord.setDestination(destination.getFlowFileQueue()); newRecord.setTransferRelationship(record.getTransferRelationship()); // put the mapping into toAdd because adding to records now will cause a ConcurrentModificationException - toAdd.put(clone, newRecord); + toAdd.put(clone.getId(), newRecord); } } } @@ -365,10 +371,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * points to the Original Claim -- which has already been removed! * */ - for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) { - final FlowFile flowFile = entry.getKey(); - final StandardRepositoryRecord record = entry.getValue(); - + for (final StandardRepositoryRecord record : checkpoint.records.values()) { if (record.isMarkedForDelete()) { // if the working claim is not the same as the original claim, we can immediately destroy the working claim // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync. @@ -380,10 +383,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // an issue if we only updated the FlowFile attributes. decrementClaimCount(record.getOriginalClaim()); } - final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); - final Connectable connectable = context.getConnectable(); - final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; - LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife}); + + if (LOG.isInfoEnabled()) { + final FlowFileRecord flowFile = record.getCurrent(); + final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); + final Connectable connectable = context.getConnectable(); + final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; + LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); + } } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { // records which have been updated - remove original if exists decrementClaimCount(record.getOriginalClaim()); @@ -544,10 +551,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE flowFileEvent.setFlowFilesSent(flowFilesSent); flowFileEvent.setBytesSent(bytesSent); + final long now = System.currentTimeMillis(); long lineageMillis = 0L; - for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) { - final FlowFile flowFile = entry.getKey(); - final long lineageDuration = System.currentTimeMillis() - flowFile.getLineageStartDate(); + for (final StandardRepositoryRecord record : checkpoint.records.values()) { + final FlowFile flowFile = record.getCurrent(); + final long lineageDuration = now - flowFile.getLineageStartDate(); lineageMillis += lineageDuration; } flowFileEvent.setAggregateLineageMillis(lineageMillis); @@ -566,13 +574,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private Map<String, Long> combineCounters(final Map<String, Long> first, final Map<String, Long> second) { - if (first == null && second == null) { + final boolean firstEmpty = first == null || first.isEmpty(); + final boolean secondEmpty = second == null || second.isEmpty(); + + if (firstEmpty && secondEmpty) { return null; } - if (first == null) { + if (firstEmpty) { return second; } - if (second == null) { + if (secondEmpty) { return first; } @@ -582,14 +593,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return combined; } - private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) { - Set<ProvenanceEventType> eventTypes = map.get(id); - if (eventTypes == null) { - eventTypes = new HashSet<>(); - map.put(id, eventTypes); - } + private void addEventType(final Map<String, BitSet> map, final String id, final ProvenanceEventType eventType) { + final BitSet eventTypes = map.computeIfAbsent(id, key -> new BitSet()); + eventTypes.set(eventType.ordinal()); + } - eventTypes.add(eventType); + private StandardRepositoryRecord getRecord(final FlowFile flowFile) { + return records.get(flowFile.getId()); } private void updateProvenanceRepo(final Checkpoint checkpoint) { @@ -600,7 +610,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet // for this, so that we are able to ensure that the events are submitted in the proper order. final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>(); - final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = new HashMap<>(); + final Map<String, BitSet> eventTypesPerFlowFileId = new HashMap<>(); final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents; @@ -613,7 +623,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ProvenanceEventBuilder builder = entry.getValue(); final FlowFile flowFile = entry.getKey(); - updateEventContentClaims(builder, flowFile, checkpoint.records.get(flowFile)); + updateEventContentClaims(builder, flowFile, checkpoint.getRecord(flowFile)); final ProvenanceEventRecord event = builder.build(); if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { @@ -692,14 +702,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } if (checkpoint.createdFlowFiles.contains(flowFileId)) { - final Set<ProvenanceEventType> registeredTypes = eventTypesPerFlowFileId.get(flowFileId); + final BitSet registeredTypes = eventTypesPerFlowFileId.get(flowFileId); boolean creationEventRegistered = false; if (registeredTypes != null) { - if (registeredTypes.contains(ProvenanceEventType.CREATE) - || registeredTypes.contains(ProvenanceEventType.FORK) - || registeredTypes.contains(ProvenanceEventType.JOIN) - || registeredTypes.contains(ProvenanceEventType.RECEIVE) - || registeredTypes.contains(ProvenanceEventType.FETCH)) { + if (registeredTypes.get(ProvenanceEventType.CREATE.ordinal()) + || registeredTypes.get(ProvenanceEventType.FORK.ordinal()) + || registeredTypes.get(ProvenanceEventType.JOIN.ordinal()) + || registeredTypes.get(ProvenanceEventType.RECEIVE.ordinal()) + || registeredTypes.get(ProvenanceEventType.FETCH.ordinal())) { + creationEventRegistered = true; } } @@ -802,7 +813,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile, final long commitNanos) { verifyTaskActive(); - final StandardRepositoryRecord repoRecord = records.get(flowFile); + final StandardRepositoryRecord repoRecord = getRecord(flowFile); if (repoRecord == null) { throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")"); } @@ -839,12 +850,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private StandardProvenanceEventRecord enrich( - final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, + final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<Long, StandardRepositoryRecord> records, final boolean updateAttributes, final long commitNanos) { final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); if (eventFlowFile != null) { - final StandardRepositoryRecord repoRecord = records.get(eventFlowFile); + final StandardRepositoryRecord repoRecord = records.get(eventFlowFile.getId()); if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) { final ContentClaim currentClaim = repoRecord.getCurrentClaim(); @@ -910,7 +921,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * @param records records * @return true if spurious route */ - private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) { + private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<Long, StandardRepositoryRecord> records) { if (event.getEventType() == ProvenanceEventType.ROUTE) { final String relationshipName = event.getRelationship(); final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); @@ -919,10 +930,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event, // as it may be cloning the FlowFile and adding to multiple connections. if (connectionsForRelationship.size() == 1) { - for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet()) { - final FlowFileRecord flowFileRecord = entry.getKey(); + for (final StandardRepositoryRecord repoRecord : records.values()) { + final FlowFileRecord flowFileRecord = repoRecord.getCurrent(); if (event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key()))) { - final StandardRepositoryRecord repoRecord = entry.getValue(); if (repoRecord.getOriginalQueue() == null) { return false; } @@ -1077,35 +1087,35 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StringBuilder details = new StringBuilder(1024).append("["); final int initLen = details.length(); int filesListed = 0; - for (Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet()) { + for (StandardRepositoryRecord repoRecord : records.values()) { if (filesListed >= MAX_ROLLBACK_FLOWFILES_TO_LOG) { break; } filesListed++; - final FlowFileRecord entryKey = entry.getKey(); - final StandardRepositoryRecord entryValue = entry.getValue(); + if (details.length() > initLen) { details.append(", "); } - if (entryValue.getOriginalQueue() != null && entryValue.getOriginalQueue().getIdentifier() != null) { + if (repoRecord.getOriginalQueue() != null && repoRecord.getOriginalQueue().getIdentifier() != null) { details.append("queue=") - .append(entryValue.getOriginalQueue().getIdentifier()) + .append(repoRecord.getOriginalQueue().getIdentifier()) .append("/"); } details.append("filename=") - .append(entryKey.getAttribute(CoreAttributes.FILENAME.key())) + .append(repoRecord.getCurrent().getAttribute(CoreAttributes.FILENAME.key())) .append("/uuid=") - .append(entryKey.getAttribute(CoreAttributes.UUID.key())); + .append(repoRecord.getCurrent().getAttribute(CoreAttributes.UUID.key())); } - if (records.entrySet().size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) { + if (records.size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) { if (details.length() > initLen) { details.append(", "); } - details.append(records.entrySet().size() - MAX_ROLLBACK_FLOWFILES_TO_LOG) + details.append(records.size() - MAX_ROLLBACK_FLOWFILES_TO_LOG) .append(" additional Flowfiles not listed"); } else if (filesListed == 0) { details.append("none"); } + details.append("]"); return details.toString(); } @@ -1216,7 +1226,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed"); } - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); if (record == null) { throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")"); } @@ -1275,8 +1285,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE for (final FlowFile flowFile : flowFiles) { final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile; - final StandardRepositoryRecord repoRecord = this.records.remove(flowFile); - newOwner.records.put(flowFileRecord, repoRecord); + final StandardRepositoryRecord repoRecord = this.records.remove(flowFile.getId()); + newOwner.records.put(flowFileRecord.getId(), repoRecord); // Adjust the counts for Connections for each FlowFile that was pulled from a Connection. // We do not have to worry about accounting for 'input counts' on connections because those @@ -1348,9 +1358,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Set<String> modifiedFlowFileIds = new HashSet<>(); int largestTransferSetSize = 0; - for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) { - final FlowFile flowFile = entry.getKey(); + for (final Map.Entry<Long, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) { final StandardRepositoryRecord record = entry.getValue(); + final FlowFile flowFile = record.getCurrent(); final Relationship relationship = record.getTransferRelationship(); if (Relationship.SELF.equals(relationship)) { @@ -1479,7 +1489,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) { final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile); - records.put(flowFile, record); + records.put(flowFile.getId(), record); flowFilesIn++; contentSizeIn += flowFile.getSize(); @@ -1655,16 +1665,17 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); final Map<String, String> attrs = new HashMap<>(); - attrs.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); + final String uuid = UUID.randomUUID().toString(); + attrs.put(CoreAttributes.FILENAME.key(), uuid); attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); - attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + attrs.put(CoreAttributes.UUID.key(), uuid); final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) .addAttributes(attrs) .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, attrs); - records.put(fFile, record); + records.put(fFile.getId(), record); createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); return fFile; } @@ -1681,7 +1692,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); example = validateRecordState(example); - final StandardRepositoryRecord exampleRepoRecord = records.get(example); + final StandardRepositoryRecord exampleRepoRecord = getRecord(example); final FlowFileRecord currRec = exampleRepoRecord.getCurrent(); final ContentClaim claim = exampleRepoRecord.getCurrentClaim(); if (offset + size > example.getSize()) { @@ -1702,7 +1713,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(clone, clone.getAttributes()); - records.put(clone, record); + records.put(clone.getId(), record); if (offset == 0L && size == example.getSize()) { provenanceReporter.clone(example, clone); @@ -1730,7 +1741,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE eventBuilder.setComponentType(processorType); eventBuilder.addParentFlowFile(parent); - updateEventContentClaims(eventBuilder, parent, records.get(parent)); + updateEventContentClaims(eventBuilder, parent, getRecord(parent)); forkEventBuilders.put(parent, eventBuilder); } @@ -1752,7 +1763,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); flowFile = validateRecordState(flowFile); - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build(); record.setWorking(newFile); @@ -1768,7 +1779,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return flowFile; } - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build(); record.setWorking(newFile, key, value); @@ -1780,7 +1791,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); flowFile = validateRecordState(flowFile); - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); final Map<String, String> updatedAttributes; if (attributes.containsKey(CoreAttributes.UUID.key())) { @@ -1794,6 +1805,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final FlowFileRecord newFile = ffBuilder.build(); record.setWorking(newFile, updatedAttributes); + return newFile; } @@ -1806,7 +1818,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return flowFile; } - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build(); record.setWorking(newFile, key, null); return newFile; @@ -1821,7 +1833,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return flowFile; } - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keys).build(); final Map<String, String> updatedAttrs = new HashMap<>(); @@ -1842,7 +1854,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); flowFile = validateRecordState(flowFile); - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build(); if (keyPattern == null) { @@ -1895,7 +1907,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // the relationship specified is not known in this session/context throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known"); } - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); record.setTransferRelationship(relationship); updateLastQueuedDate(record); @@ -1913,7 +1925,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); flowFile = validateRecordState(flowFile); - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); if (record.getOriginalQueue() == null) { throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self"); } @@ -1951,7 +1963,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long queuedTime = System.currentTimeMillis(); long contentSize = 0L; for (final FlowFile flowFile : flowFiles) { - final StandardRepositoryRecord record = records.get(flowFile); + final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile; + final StandardRepositoryRecord record = getRecord(flowFileRecord); record.setTransferRelationship(relationship); updateLastQueuedDate(record, queuedTime); @@ -1972,7 +1985,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); flowFile = validateRecordState(flowFile); - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); record.markForDelete(); removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key())); @@ -1996,7 +2009,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE flowFiles = validateRecordState(flowFiles); for (final FlowFile flowFile : flowFiles) { - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); record.markForDelete(); removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key())); @@ -2195,7 +2208,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); source = validateRecordState(source, true); - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); try { ensureNotAppending(record.getCurrentClaim()); @@ -2251,7 +2264,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); source = validateRecordState(source, true); - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); try { ensureNotAppending(record.getCurrentClaim()); @@ -2400,7 +2413,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Collection<StandardRepositoryRecord> sourceRecords = new ArrayList<>(); for (final FlowFile source : sources) { - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); sourceRecords.add(record); try { @@ -2411,7 +2424,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - final StandardRepositoryRecord destinationRecord = records.get(destination); + final StandardRepositoryRecord destinationRecord = getRecord(destination); final ContentRepository contentRepo = context.getContentRepository(); final ContentClaim newClaim; try { @@ -2437,7 +2450,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final boolean useDemarcator = demarcator != null && demarcator.length > 0; final int numSources = sources.size(); for (final FlowFile source : sources) { - final StandardRepositoryRecord sourceRecord = records.get(source); + final StandardRepositoryRecord sourceRecord = getRecord(source); final long copied = contentRepo.exportTo(sourceRecord.getCurrentClaim(), out, sourceRecord.getCurrentClaimOffset(), source.getSize()); writtenCount += copied; @@ -2473,7 +2486,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE removeTemporaryClaim(destinationRecord); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(destinationRecord.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(writtenCount).build(); destinationRecord.setWorking(newFile); - records.put(newFile, destinationRecord); return newFile; } @@ -2495,7 +2507,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public OutputStream write(FlowFile source) { verifyTaskActive(); source = validateRecordState(source); - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); ContentClaim newClaim = null; try { @@ -2618,7 +2630,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public FlowFile write(FlowFile source, final OutputStreamCallback writer) { verifyTaskActive(); source = validateRecordState(source); - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); long writtenToFlowFile = 0L; ContentClaim newClaim = null; @@ -2677,7 +2689,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); source = validateRecordState(source); - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); long newSize = 0L; // Get the current Content Claim from the record and see if we already have @@ -2858,7 +2870,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public FlowFile write(FlowFile source, final StreamCallback writer) { verifyTaskActive(); source = validateRecordState(source); - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); final ContentClaim currClaim = record.getCurrentClaim(); long writtenToFlowFile = 0L; @@ -2932,6 +2944,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .build(); record.setWorking(newFile); + return newFile; } @@ -2946,7 +2959,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import."); } - final StandardRepositoryRecord record = records.get(destination); + final StandardRepositoryRecord record = getRecord(destination); final ContentClaim newClaim; final long claimOffset; @@ -2992,7 +3005,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); destination = validateRecordState(destination); - final StandardRepositoryRecord record = records.get(destination); + final StandardRepositoryRecord record = getRecord(destination); ContentClaim newClaim = null; final long claimOffset = 0L; @@ -3030,7 +3043,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public void exportTo(FlowFile source, final Path destination, final boolean append) { verifyTaskActive(); source = validateRecordState(source); - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); try { ensureNotAppending(record.getCurrentClaim()); @@ -3049,7 +3062,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public void exportTo(FlowFile source, final OutputStream destination) { verifyTaskActive(); source = validateRecordState(source); - final StandardRepositoryRecord record = records.get(source); + final StandardRepositoryRecord record = getRecord(source); if(record.getCurrentClaim() == null) { return; @@ -3137,7 +3150,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new IllegalStateException(flowFile + " already in use for an active callback or an OutputStream created by ProcessSession.write(FlowFile) has not been closed"); } - final StandardRepositoryRecord record = records.get(flowFile); + final StandardRepositoryRecord record = getRecord(flowFile); if (record == null) { rollback(); throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")"); @@ -3170,11 +3183,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * <code>false</code> otherwise. */ boolean isFlowFileKnown(final FlowFile flowFile) { - return records.containsKey(flowFile); + return records.containsKey(flowFile.getId()); } private FlowFile getMostRecent(final FlowFile flowFile) { - final StandardRepositoryRecord existingRecord = records.get(flowFile); + final StandardRepositoryRecord existingRecord = getRecord(flowFile); return existingRecord == null ? flowFile : existingRecord.getCurrent(); } @@ -3183,10 +3196,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE verifyTaskActive(); parent = getMostRecent(parent); + final String uuid = UUID.randomUUID().toString(); + final Map<String, String> newAttributes = new HashMap<>(3); - newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); + newAttributes.put(CoreAttributes.FILENAME.key(), uuid); newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); - newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + newAttributes.put(CoreAttributes.UUID.key(), uuid); final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()); @@ -3210,7 +3225,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final FlowFileRecord fFile = fFileBuilder.build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, newAttributes); - records.put(fFile, record); + records.put(fFile.getId(), record); createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); registerForkEvent(parent, fFile); @@ -3247,9 +3262,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime())); + final String uuid = UUID.randomUUID().toString(); + newAttributes.put(CoreAttributes.FILENAME.key(), uuid); newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); - newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + newAttributes.put(CoreAttributes.UUID.key(), uuid); final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) .addAttributes(newAttributes) @@ -3258,7 +3274,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, newAttributes); - records.put(fFile, record); + records.put(fFile.getId(), record); createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); registerJoinEvent(fFile, parents); @@ -3339,7 +3355,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final List<ProvenanceEventRecord> autoTerminatedEvents = new ArrayList<>(); private final Set<ProvenanceEventRecord> reportedEvents = new LinkedHashSet<>(); - private final Map<FlowFileRecord, StandardRepositoryRecord> records = new ConcurrentHashMap<>(); + private final Map<Long, StandardRepositoryRecord> records = new ConcurrentHashMap<>(); private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>(); private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>(); @@ -3392,5 +3408,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE this.contentSizeIn += session.contentSizeIn; this.contentSizeOut += session.contentSizeOut; } + + private StandardRepositoryRecord getRecord(final FlowFile flowFile) { + return records.get(flowFile.getId()); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 167fa73..299f73b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.processor; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query.Range; @@ -41,6 +33,15 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.processor.exception.TerminatedTaskException; import org.apache.nifi.util.Connectables; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class StandardProcessContext implements ProcessContext, ControllerServiceLookup { private final ProcessorNode procNode; @@ -49,6 +50,7 @@ public class StandardProcessContext implements ProcessContext, ControllerService private final StringEncryptor encryptor; private final StateManager stateManager; private final TaskTermination taskTermination; + private final Map<PropertyDescriptor, String> properties; public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager, final TaskTermination taskTermination) { @@ -71,6 +73,8 @@ public class StandardProcessContext implements ProcessContext, ControllerService preparedQueries.put(desc, pq); } } + + properties = Collections.unmodifiableMap(processorNode.getProperties()); } private void verifyTaskActive() { @@ -82,7 +86,17 @@ public class StandardProcessContext implements ProcessContext, ControllerService @Override public PropertyValue getProperty(final PropertyDescriptor descriptor) { verifyTaskActive(); - return getProperty(descriptor.getName()); + + final String setPropertyValue = properties.get(descriptor); + if (setPropertyValue != null) { + return new StandardPropertyValue(setPropertyValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry()); + } + + // Get the "canonical" Property Descriptor from the Processor + final PropertyDescriptor canonicalDescriptor = procNode.getProcessor().getPropertyDescriptor(descriptor.getName()); + final String defaultValue = canonicalDescriptor.getDefaultValue(); + + return new StandardPropertyValue(defaultValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry()); } /** @@ -99,7 +113,7 @@ public class StandardProcessContext implements ProcessContext, ControllerService return null; } - final String setPropertyValue = procNode.getProperty(descriptor); + final String setPropertyValue = properties.get(descriptor); final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue; return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry()); @@ -138,7 +152,7 @@ public class StandardProcessContext implements ProcessContext, ControllerService @Override public Map<PropertyDescriptor, String> getProperties() { verifyTaskActive(); - return procNode.getProperties(); + return properties; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java index 6f045e5..c960902 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java @@ -16,30 +16,31 @@ */ package org.apache.nifi.controller.repository; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.processor.Relationship; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.processor.Relationship; - public class StandardRepositoryRecord implements RepositoryRecord { - private RepositoryRecordType type = null; + private RepositoryRecordType type; private FlowFileRecord workingFlowFileRecord = null; private Relationship transferRelationship = null; private FlowFileQueue destination = null; private final FlowFileRecord originalFlowFileRecord; private final FlowFileQueue originalQueue; private String swapLocation; - private final Map<String, String> updatedAttributes = new HashMap<>(); private final Map<String, String> originalAttributes; + private Map<String, String> updatedAttributes = null; private List<ContentClaim> transientClaims; private final long startNanos = System.nanoTime(); + /** * Creates a new record which has no original claim or flow file - it is entirely new * @@ -66,7 +67,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { this.originalFlowFileRecord = originalFlowFileRecord; this.type = RepositoryRecordType.SWAP_OUT; this.swapLocation = swapLocation; - this.originalAttributes = originalFlowFileRecord == null ? Collections.<String, String>emptyMap() : originalFlowFileRecord.getAttributes(); + this.originalAttributes = originalFlowFileRecord == null ? Collections.emptyMap() : originalFlowFileRecord.getAttributes(); } @Override @@ -113,30 +114,48 @@ public class StandardRepositoryRecord implements RepositoryRecord { workingFlowFileRecord = flowFile; } + private Map<String, String> initializeUpdatedAttributes() { + if (updatedAttributes == null) { + updatedAttributes = new HashMap<>(); + } + + return updatedAttributes; + } + public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) { workingFlowFileRecord = flowFile; + // In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them. + if (type == RepositoryRecordType.CREATE) { + return; + } + // If setting attribute to same value as original, don't add to updated attributes final String currentValue = originalAttributes.get(attributeKey); if (currentValue == null || !currentValue.equals(attributeValue)) { - updatedAttributes.put(attributeKey, attributeValue); + initializeUpdatedAttributes().put(attributeKey, attributeValue); } } public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs) { workingFlowFileRecord = flowFile; + // In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them. + if (type == RepositoryRecordType.CREATE) { + return; + } + for (final Map.Entry<String, String> entry : updatedAttribs.entrySet()) { final String currentValue = originalAttributes.get(entry.getKey()); if (currentValue == null || !currentValue.equals(entry.getValue())) { - updatedAttributes.put(entry.getKey(), entry.getValue()); + initializeUpdatedAttributes().put(entry.getKey(), entry.getValue()); } } } @Override public boolean isAttributesChanged() { - return !updatedAttributes.isEmpty(); + return type == RepositoryRecordType.CREATE || (updatedAttributes != null && !updatedAttributes.isEmpty()); } public void markForAbort() { @@ -196,7 +215,11 @@ public class StandardRepositoryRecord implements RepositoryRecord { } Map<String, String> getUpdatedAttributes() { - return updatedAttributes; + if (type == RepositoryRecordType.CREATE) { + return getCurrent().getAttributes(); + } + + return updatedAttributes == null ? Collections.emptyMap() : updatedAttributes; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index 5ed4d9e..d16bc4b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -16,31 +16,14 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -65,6 +48,25 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.util.TextLineDemarcator; import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + @EventDriven @SideEffectFree @SupportsBatching @@ -158,19 +160,17 @@ public class SplitText extends AbstractProcessor { private static final Set<Relationship> relationships; static { - properties = Collections.unmodifiableList(Arrays.asList(new PropertyDescriptor[]{ - LINE_SPLIT_COUNT, - FRAGMENT_MAX_SIZE, - HEADER_LINE_COUNT, - HEADER_MARKER, - REMOVE_TRAILING_NEWLINES - })); - - relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(new Relationship[]{ - REL_ORIGINAL, - REL_SPLITS, - REL_FAILURE - }))); + properties = Collections.unmodifiableList(Arrays.asList( + LINE_SPLIT_COUNT, + FRAGMENT_MAX_SIZE, + HEADER_LINE_COUNT, + HEADER_MARKER, + REMOVE_TRAILING_NEWLINES)); + + relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_ORIGINAL, + REL_SPLITS, + REL_FAILURE))); } private volatile boolean removeTrailingNewLines; @@ -259,9 +259,11 @@ public class SplitText extends AbstractProcessor { processSession.transfer(sourceFlowFile, REL_FAILURE); } else { final String fragmentId = UUID.randomUUID().toString(); - List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession); + final List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession); + final FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal(processSession, sourceFlowFile, fragmentId, splitFlowFiles.size()); processSession.transfer(originalFlowFile, REL_ORIGINAL); + if (!splitFlowFiles.isEmpty()) { processSession.transfer(splitFlowFiles, REL_SPLITS); } @@ -291,6 +293,7 @@ public class SplitText extends AbstractProcessor { */ private List<FlowFile> generateSplitFlowFiles(String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){ List<FlowFile> splitFlowFiles = new ArrayList<>(); + FlowFile headerFlowFile = null; long headerCrlfLength = 0; if (splitInfo != null) { @@ -305,7 +308,11 @@ public class SplitText extends AbstractProcessor { fragmentId, fragmentIndex++, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); splitFlowFiles.add(splitFlowFile); } else { - for (SplitInfo computedSplitInfo : computedSplitsInfo) { + final Iterator<SplitInfo> itr = computedSplitsInfo.iterator(); + while (itr.hasNext()) { + final SplitInfo computedSplitInfo = itr.next(); + itr.remove(); + long length = this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length; boolean proceedWithClone = headerFlowFile != null || length > 0; if (proceedWithClone) { @@ -326,16 +333,24 @@ public class SplitText extends AbstractProcessor { splitFlowFiles.add(splitFlowFile); } } + // Update fragment.count with real split count (i.e. don't count files for which there was no clone) - for (FlowFile splitFlowFile : splitFlowFiles) { - splitFlowFile = processSession.putAttribute(splitFlowFile, FRAGMENT_COUNT, String.valueOf(fragmentIndex - 1)); // -1 because the index starts at 1 (see above) + final String fragmentCount = String.valueOf(fragmentIndex - 1); // -1 because the index starts at 1 (see above) + + final ListIterator<FlowFile> flowFileItr = splitFlowFiles.listIterator(); + while (flowFileItr.hasNext()) { + FlowFile splitFlowFile = flowFileItr.next(); + + final FlowFile updated = processSession.putAttribute(splitFlowFile, FRAGMENT_COUNT, fragmentCount); + flowFileItr.set(updated); } } - getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : ".")); + getLogger().info("Split {} into {} FlowFiles{}", new Object[] {sourceFlowFile, splitFlowFiles.size(), headerFlowFile == null ? " containing headers." : "."}); if (headerFlowFile != null) { processSession.remove(headerFlowFile); } + return splitFlowFiles; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 29e903f..ae44320 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -16,23 +16,6 @@ */ package org.apache.nifi.processors.attributes; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; @@ -75,6 +58,24 @@ import org.apache.nifi.update.attributes.FlowFilePolicy; import org.apache.nifi.update.attributes.Rule; import org.apache.nifi.update.attributes.serde.CriteriaSerDe; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + @EventDriven @SideEffectFree @SupportsBatching @@ -97,6 +98,13 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { private final static Set<Relationship> statelessRelationshipSet; private final static Set<Relationship> statefulRelationshipSet; + private final Map<String, String> canonicalValueLookup = new LinkedHashMap<String, String>() { + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() > 100; + } + }; + // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .description("All successful FlowFiles are routed to this relationship").name("success").build(); @@ -619,6 +627,30 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } } + /** + * This method caches a 'canonical' value for a given attribute value. When this processor is used to update an attribute or add a new + * attribute, if Expression Language is used, we may well end up with a new String object for each attribute for each FlowFile. As a result, + * we will store a different String object for the attribute value of every FlowFile, meaning that we have to keep a lot of String objects + * in heap. By using this 'canonical lookup', we are able to keep only a single String object on the heap. + * + * For example, if we have a property named "abc" and the value is "${abc}${xyz}", and we send through 1,000 FlowFiles with attributes abc="abc" + * and xyz="xyz", then would end up with 1,000 String objects with a value of "abcxyz". By using this canonical representation, we are able to + * instead hold a single String whose value is "abcxyz" instead of holding 1,000 String objects in heap (1,000 String objects may still be created + * when calling PropertyValue.evaluateAttributeExpressions, but this way those values are garbage collected). + * + * @param attributeValue the value whose canonical value should be return + * @return the canonical representation of the given attribute value + */ + private synchronized String getCanonicalRepresentation(final String attributeValue) { + final String canonical = this.canonicalValueLookup.get(attributeValue); + if (canonical != null) { + return canonical; + } + + this.canonicalValueLookup.put(attributeValue, attributeValue); + return attributeValue; + } + // Executes the specified action on the specified flowfile. private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile, final Map<String, String> stateInitialAttributes, final Map<String, String> stateWorkingAttributes) { @@ -688,7 +720,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { if (notDeleted || setStatefulAttribute) { try { - final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, stateInitialAttributes).getValue(); + String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, stateInitialAttributes).getValue(); + newAttributeValue = getCanonicalRepresentation(newAttributeValue); // log if appropriate if (debugEnabled) {