Repository: nifi Updated Branches: refs/heads/master 0c5e159eb -> d50e3f174
NIFI-5384 FlowFile's queued in batches should all have the same Queue time This closes #2849 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d50e3f17 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d50e3f17 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d50e3f17 Branch: refs/heads/master Commit: d50e3f17471e4ade3373b9e95e5c6a8e364248a6 Parents: 0c5e159 Author: patricker <patric...@gmail.com> Authored: Fri Jul 6 10:38:04 2018 -0600 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Sat Jul 7 07:56:53 2018 -0400 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 11 ++++++--- .../repository/TestStandardProcessSession.java | 25 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d50e3f17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 12bcafd..ea4969c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1861,12 +1861,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return newFile; } - private void updateLastQueuedDate(final StandardRepositoryRecord record) { + private void updateLastQueuedDate(final StandardRepositoryRecord record, final Long lastQueueDate) { final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) - .lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build(); + .lastQueued(lastQueueDate, enqueuedIndex.getAndIncrement()).build(); record.setWorking(newFile); } + private void updateLastQueuedDate(final StandardRepositoryRecord record) { + updateLastQueuedDate(record, System.currentTimeMillis()); + } + @Override public void transfer(FlowFile flowFile, final Relationship relationship) { verifyTaskActive(); @@ -1938,11 +1942,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final int multiplier = Math.max(1, numDestinations); + final long queuedTime = System.currentTimeMillis(); long contentSize = 0L; for (final FlowFile flowFile : flowFiles) { final StandardRepositoryRecord record = records.get(flowFile); record.setTransferRelationship(relationship); - updateLastQueuedDate(record); + updateLastQueuedDate(record, queuedTime); contentSize += flowFile.getSize(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/d50e3f17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index f47be4d..4059557 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -1651,6 +1652,30 @@ public class TestStandardProcessSession { } @Test + public void testBatchQueuedHaveSameQueuedTime() { + for (int i = 0; i < 100; i++) { + final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() + .id(i) + .addAttribute("uuid", "000000000000-0000-0000-0000-0000000" + i) + .build(); + this.flowFileQueue.put(flowFile); + } + + final List<FlowFile> flowFiles = session.get(100); + + // FlowFile Queued times should not match yet + assertNotEquals("Queued times should not be equal.", flowFiles.get(0).getLastQueueDate(), flowFiles.get(99).getLastQueueDate()); + + session.transfer(flowFiles, new Relationship.Builder().name("A").build()); + session.commit(); + + final List<FlowFile> flowFilesUpdated = session.get(100); + + // FlowFile Queued times should match + assertEquals("Queued times should be equal.", flowFilesUpdated.get(0).getLastQueueDate(), flowFilesUpdated.get(99).getLastQueueDate()); + } + + @Test public void testAttributesModifiedEmitted() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() .id(1L)