This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 055b3ca NIFI-6220: If FlowFile is created by cloning a relationship, do not create an ATTRIBUTES_MODIFIED provenance event for it. 055b3ca is described below commit 055b3cac54e3e234c0ee07189b7b7b85e0582d58 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Tue Apr 16 14:52:34 2019 -0400 NIFI-6220: If FlowFile is created by cloning a relationship, do not create an ATTRIBUTES_MODIFIED provenance event for it. NIFI-6220: Updated test name and fixed typo This closes #3438. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../repository/StandardProcessSession.java | 191 +++++++++++---------- .../integration/provenance/ProvenanceEventsIT.java | 58 +++++++ 2 files changed, 160 insertions(+), 89 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 14cb70a..c2502d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -325,6 +325,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newRecord.setTransferRelationship(record.getTransferRelationship()); // put the mapping into toAdd because adding to records now will cause a ConcurrentModificationException toAdd.put(clone.getId(), newRecord); + + createdFlowFiles.add(newUuid); } } } @@ -639,6 +641,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE recordsToSubmit.add(event); addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); + + final List<String> childUuids = event.getChildUuids(); + if (childUuids != null) { + for (final String childUuid : childUuids) { + addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType()); + } + } } // Finally, add any other events that we may have generated. @@ -684,6 +693,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (registeredTypes != null) { if (registeredTypes.get(ProvenanceEventType.CREATE.ordinal()) || registeredTypes.get(ProvenanceEventType.FORK.ordinal()) + || registeredTypes.get(ProvenanceEventType.CLONE.ordinal()) || registeredTypes.get(ProvenanceEventType.JOIN.ordinal()) || registeredTypes.get(ProvenanceEventType.RECEIVE.ordinal()) || registeredTypes.get(ProvenanceEventType.FETCH.ordinal())) { @@ -771,6 +781,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE provenanceRepo.registerEvents(iterable); } + private void updateEventContentClaims(final ProvenanceEventBuilder builder, final FlowFile flowFile, final StandardRepositoryRecord repoRecord) { final ContentClaim originalClaim = repoRecord.getOriginalClaim(); if (originalClaim == null) { @@ -1678,6 +1689,97 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override + public FlowFile create(FlowFile parent) { + verifyTaskActive(); + parent = getMostRecent(parent); + + final String uuid = UUID.randomUUID().toString(); + + final Map<String, String> newAttributes = new HashMap<>(3); + newAttributes.put(CoreAttributes.FILENAME.key(), uuid); + newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); + newAttributes.put(CoreAttributes.UUID.key(), uuid); + + final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()); + + // copy all attributes from parent except for the "special" attributes. Copying the special attributes + // can cause problems -- especially the ALTERNATE_IDENTIFIER, because copying can cause Provenance Events + // to be incorrectly created. + for (final Map.Entry<String, String> entry : parent.getAttributes().entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) + || CoreAttributes.DISCARD_REASON.key().equals(key) + || CoreAttributes.UUID.key().equals(key)) { + continue; + } + newAttributes.put(key, value); + } + + fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex()); + fFileBuilder.addAttributes(newAttributes); + + final FlowFileRecord fFile = fFileBuilder.build(); + final StandardRepositoryRecord record = new StandardRepositoryRecord(null); + record.setWorking(fFile, newAttributes); + records.put(fFile.getId(), record); + createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); + + registerForkEvent(parent, fFile); + return fFile; + } + + @Override + public FlowFile create(Collection<FlowFile> parents) { + verifyTaskActive(); + + parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList()); + + final Map<String, String> newAttributes = intersectAttributes(parents); + newAttributes.remove(CoreAttributes.UUID.key()); + newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key()); + newAttributes.remove(CoreAttributes.DISCARD_REASON.key()); + + // When creating a new FlowFile from multiple parents, we need to add all of the Lineage Identifiers + // and use the earliest lineage start date + long lineageStartDate = 0L; + for (final FlowFile parent : parents) { + + final long parentLineageStartDate = parent.getLineageStartDate(); + if (lineageStartDate == 0L || parentLineageStartDate < lineageStartDate) { + lineageStartDate = parentLineageStartDate; + } + } + + // find the smallest lineage start index that has the same lineage start date as the one we've chosen. + long lineageStartIndex = 0L; + for (final FlowFile parent : parents) { + if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) { + lineageStartIndex = parent.getLineageStartIndex(); + } + } + + final String uuid = UUID.randomUUID().toString(); + newAttributes.put(CoreAttributes.FILENAME.key(), uuid); + newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); + newAttributes.put(CoreAttributes.UUID.key(), uuid); + + final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) + .addAttributes(newAttributes) + .lineageStart(lineageStartDate, lineageStartIndex) + .build(); + + final StandardRepositoryRecord record = new StandardRepositoryRecord(null); + record.setWorking(fFile, newAttributes); + records.put(fFile.getId(), record); + createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); + + registerJoinEvent(fFile, parents); + return fFile; + } + + + @Override public FlowFile clone(FlowFile example) { verifyTaskActive(); example = validateRecordState(example); @@ -3171,95 +3273,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return existingRecord == null ? flowFile : existingRecord.getCurrent(); } - @Override - public FlowFile create(FlowFile parent) { - verifyTaskActive(); - parent = getMostRecent(parent); - - final String uuid = UUID.randomUUID().toString(); - - final Map<String, String> newAttributes = new HashMap<>(3); - newAttributes.put(CoreAttributes.FILENAME.key(), uuid); - newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); - newAttributes.put(CoreAttributes.UUID.key(), uuid); - - final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()); - - // copy all attributes from parent except for the "special" attributes. Copying the special attributes - // can cause problems -- especially the ALTERNATE_IDENTIFIER, because copying can cause Provenance Events - // to be incorrectly created. - for (final Map.Entry<String, String> entry : parent.getAttributes().entrySet()) { - final String key = entry.getKey(); - final String value = entry.getValue(); - if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) - || CoreAttributes.DISCARD_REASON.key().equals(key) - || CoreAttributes.UUID.key().equals(key)) { - continue; - } - newAttributes.put(key, value); - } - - fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex()); - fFileBuilder.addAttributes(newAttributes); - - final FlowFileRecord fFile = fFileBuilder.build(); - final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(fFile, newAttributes); - records.put(fFile.getId(), record); - createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); - - registerForkEvent(parent, fFile); - return fFile; - } - - @Override - public FlowFile create(Collection<FlowFile> parents) { - verifyTaskActive(); - - parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList()); - - final Map<String, String> newAttributes = intersectAttributes(parents); - newAttributes.remove(CoreAttributes.UUID.key()); - newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key()); - newAttributes.remove(CoreAttributes.DISCARD_REASON.key()); - - // When creating a new FlowFile from multiple parents, we need to add all of the Lineage Identifiers - // and use the earliest lineage start date - long lineageStartDate = 0L; - for (final FlowFile parent : parents) { - - final long parentLineageStartDate = parent.getLineageStartDate(); - if (lineageStartDate == 0L || parentLineageStartDate < lineageStartDate) { - lineageStartDate = parentLineageStartDate; - } - } - - // find the smallest lineage start index that has the same lineage start date as the one we've chosen. - long lineageStartIndex = 0L; - for (final FlowFile parent : parents) { - if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) { - lineageStartIndex = parent.getLineageStartIndex(); - } - } - - final String uuid = UUID.randomUUID().toString(); - newAttributes.put(CoreAttributes.FILENAME.key(), uuid); - newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH); - newAttributes.put(CoreAttributes.UUID.key(), uuid); - - final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) - .addAttributes(newAttributes) - .lineageStart(lineageStartDate, lineageStartIndex) - .build(); - - final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(fFile, newAttributes); - records.put(fFile.getId(), record); - createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key())); - - registerJoinEvent(fFile, parents); - return fFile; - } /** * Returns the attributes that are common to every FlowFile given. The key diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java index eb710fb..6ef7bbc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java @@ -420,4 +420,62 @@ public class ProvenanceEventsIT extends FrameworkIntegrationTest { final ProvenanceEventRecord thirdEvent = provRepo.getEvent(2L); assertEquals(ProvenanceEventType.DROP, thirdEvent.getEventType()); } + + @Test + public void testCloneOnMultipleConnectionsForRelationship() throws ExecutionException, InterruptedException, IOException { + final ProcessorNode generateProcessor = createGenerateProcessor(0); + final ProcessorNode passThroughProcessor = createProcessorNode((context, session) -> { + FlowFile original = session.get(); + session.transfer(original, REL_SUCCESS); + }, REL_SUCCESS); + + connect(generateProcessor, passThroughProcessor, REL_SUCCESS); + connect(passThroughProcessor, getTerminateProcessor(), REL_SUCCESS); + connect(passThroughProcessor, getTerminateAllProcessor(), REL_SUCCESS); + + triggerOnce(generateProcessor); + triggerOnce(passThroughProcessor); + + final ProvenanceEventRepository provRepo = getProvenanceRepository(); + assertEquals(1L, provRepo.getMaxEventId().longValue()); + + final ProvenanceEventRecord firstEvent = provRepo.getEvent(0L); + assertEquals(ProvenanceEventType.CREATE, firstEvent.getEventType()); + + final ProvenanceEventRecord secondEvent = provRepo.getEvent(1L); + assertEquals(ProvenanceEventType.CLONE, secondEvent.getEventType()); + assertEquals(1, secondEvent.getParentUuids().size()); + assertEquals(1, secondEvent.getChildUuids().size()); + } + + @Test + public void testCloneOnMultipleConnectionsForRelationshipIncludesUpdatedAttributes() throws ExecutionException, InterruptedException, IOException { + final ProcessorNode generateProcessor = createGenerateProcessor(0); + final ProcessorNode passThroughProcessor = createProcessorNode((context, session) -> { + FlowFile original = session.get(); + original = session.putAttribute(original, "test", "integration"); + + session.transfer(original, REL_SUCCESS); + }, REL_SUCCESS); + + connect(generateProcessor, passThroughProcessor, REL_SUCCESS); + connect(passThroughProcessor, getTerminateProcessor(), REL_SUCCESS); + connect(passThroughProcessor, getTerminateAllProcessor(), REL_SUCCESS); + + triggerOnce(generateProcessor); + triggerOnce(passThroughProcessor); + + final ProvenanceEventRepository provRepo = getProvenanceRepository(); + assertEquals(1L, provRepo.getMaxEventId().longValue()); + + final ProvenanceEventRecord firstEvent = provRepo.getEvent(0L); + assertEquals(ProvenanceEventType.CREATE, firstEvent.getEventType()); + + final ProvenanceEventRecord secondEvent = provRepo.getEvent(1L); + assertEquals(ProvenanceEventType.CLONE, secondEvent.getEventType()); + assertEquals(1, secondEvent.getParentUuids().size()); + assertEquals(1, secondEvent.getChildUuids().size()); + assertEquals("integration", secondEvent.getAttribute("test")); + } + }