This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 66d5ab8  NIFI-7011: This closes #3983. SwappablePriorityQueue contains 
two internal data structures: activeQueue, swapQueue. activeQueue is intended 
to be pulled from for processing. swapQueue is intended to hold FlowFiles that 
are waiting to be swapped out. SinWe want to ensure that we first swap in any 
data that has already been swapped out before processing the swap queue, in 
order to ensure that we process the data in the correct order. This fix 
ddresses an issue where data w [...]
66d5ab8 is described below

commit 66d5ab80eb22f535da0898ae0d6a4a5da2dd7bd9
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Fri Jan 10 11:27:57 2020 -0500

    NIFI-7011: This closes #3983. SwappablePriorityQueue contains two internal 
data structures: activeQueue, swapQueue. activeQueue is intended to be pulled 
from for processing. swapQueue is intended to hold FlowFiles that are waiting 
to be swapped out. SinWe want to ensure that we first swap in any data that has 
already been swapped out before processing the swap queue, in order to ensure 
that we process the data in the correct order. This fix ddresses an issue where 
data was being swapp [...]
    NIFI-7011: Addressed corner case where data could be inserted out of order 
still if added while swapping was taking place
    NIFI-7011: Fixed ordering issue with swap queue that can occur if data is 
migrated from swap queue to active queue instead of being swapped out
---
 .../controller/queue/SwappablePriorityQueue.java   |  54 ++---
 .../repository/StandardProcessSession.java         |   1 -
 .../nifi/controller/TestStandardFlowFileQueue.java |  22 --
 .../clustered/TestSwappablePriorityQueue.java      | 227 ++++++++++++++++++++-
 .../integration/swap/StandaloneSwapFileIT.java     |   4 +-
 5 files changed, 255 insertions(+), 53 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index b81bd3f..c442838 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -162,6 +162,9 @@ public class SwappablePriorityQueue {
         }
 
         migrateSwapToActive();
+        if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
 
         final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
 
@@ -171,10 +174,11 @@ public class SwappablePriorityQueue {
             originalSwapQueueBytes += flowFile.getSize();
         }
 
-        // Create a new Priority queue with the prioritizers that are set, but 
reverse the
-        // prioritizers because we want to pull the lowest-priority FlowFiles 
to swap out
-        final PriorityQueue<FlowFileRecord> tempQueue = new 
PriorityQueue<>(activeQueue.size() + swapQueue.size(), 
Collections.reverseOrder(new QueuePrioritizer(getPriorities())));
-        tempQueue.addAll(activeQueue);
+        // Create a new Priority queue with the same prioritizers that are set 
for this queue. We want to swap out the highest priority data first, because
+        // whatever data we don't write out to a swap file (because there 
isn't enough to fill a swap file) will be added back to the swap queue.
+        // Since the swap queue cannot be processed until all swap files, we 
want to ensure that only the lowest priority data goes back onto it. Which means
+        // that we must swap out the highest priority data that is currently 
on the swap queue.
+        final PriorityQueue<FlowFileRecord> tempQueue = new 
PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities()));
         tempQueue.addAll(swapQueue);
 
         long bytesSwappedOut = 0L;
@@ -221,23 +225,14 @@ public class SwappablePriorityQueue {
         // swap queue. Then add the records back to the active queue.
         swapQueue.clear();
         long updatedSwapQueueBytes = 0L;
-        while (tempQueue.size() > swapThreshold) {
-            final FlowFileRecord record = tempQueue.poll();
+        FlowFileRecord record;
+        while ((record = tempQueue.poll()) != null) {
             swapQueue.add(record);
             updatedSwapQueueBytes += record.getSize();
         }
 
         Collections.reverse(swapQueue); // currently ordered in reverse 
priority order based on the ordering of the temp queue
 
-        // replace the contents of the active queue, since we've merged it 
with the swap queue.
-        activeQueue.clear();
-        FlowFileRecord toRequeue;
-        long activeQueueBytes = 0L;
-        while ((toRequeue = tempQueue.poll()) != null) {
-            activeQueue.offer(toRequeue);
-            activeQueueBytes += toRequeue.getSize();
-        }
-
         boolean updated = false;
         while (!updated) {
             final FlowFileQueueSize originalSize = getFlowFileQueueSize();
@@ -245,13 +240,13 @@ public class SwappablePriorityQueue {
             final int addedSwapRecords = swapQueue.size() - 
originalSwapQueueCount;
             final long addedSwapBytes = updatedSwapQueueBytes - 
originalSwapQueueBytes;
 
-            final FlowFileQueueSize newSize = new 
FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
+            final FlowFileQueueSize newSize = new 
FlowFileQueueSize(originalSize.getActiveCount(), originalSize.getActiveBytes(),
                 originalSize.getSwappedCount() + addedSwapRecords + 
flowFilesSwappedOut,
                 originalSize.getSwappedBytes() + addedSwapBytes + 
bytesSwappedOut,
                 originalSize.getSwapFileCount() + numSwapFiles,
                 originalSize.getUnacknowledgedCount(), 
originalSize.getUnacknowledgedBytes());
-            updated = updateSize(originalSize, newSize);
 
+            updated = updateSize(originalSize, newSize);
             if (updated) {
                 logIfNegative(originalSize, newSize, "swap");
             }
@@ -286,9 +281,7 @@ public class SwappablePriorityQueue {
         // Calling this method when records are polled prevents this condition 
by migrating FlowFiles from the
         // Swap Queue to the Active Queue. However, we don't do this if there 
are FlowFiles already swapped out
         // to disk, because we want them to be swapped back in in the same 
order that they were swapped out.
-
-        final int activeQueueSize = activeQueue.size();
-        if (activeQueueSize > 0 && activeQueueSize > swapThreshold - 
SWAP_RECORD_POLL_SIZE) {
+        if (!activeQueue.isEmpty()) {
             return;
         }
 
@@ -315,20 +308,33 @@ public class SwappablePriorityQueue {
             return;
         }
 
+        // Swap Queue is not currently ordered. We want to migrate the highest 
priority FlowFiles to the Active Queue, then re-queue the lowest priority items.
+        final PriorityQueue<FlowFileRecord> tempQueue = new 
PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities()));
+        tempQueue.addAll(swapQueue);
+
         int recordsMigrated = 0;
         long bytesMigrated = 0L;
-        final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
-        while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
-            final FlowFileRecord toMigrate = swapItr.next();
+        while (activeQueue.size() < swapThreshold) {
+            final FlowFileRecord toMigrate = tempQueue.poll();
+            if (toMigrate == null) {
+                break;
+            }
+
             activeQueue.add(toMigrate);
             bytesMigrated += toMigrate.getSize();
             recordsMigrated++;
-            swapItr.remove();
+        }
+
+        swapQueue.clear();
+        FlowFileRecord toRequeue;
+        while ((toRequeue = tempQueue.poll()) != null) {
+            swapQueue.add(toRequeue);
         }
 
         if (recordsMigrated > 0) {
             incrementActiveQueueSize(recordsMigrated, bytesMigrated);
             incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0);
+            logger.debug("Migrated {} FlowFiles from swap queue to active 
queue for {}", recordsMigrated, this);
         }
 
         if (size.getSwappedCount() == 0) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index da7a6ee..089ac90 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -56,7 +56,6 @@ import 
org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.rocksdb.Checkpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index b454d2d..4cb28a2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -315,28 +315,6 @@ public class TestStandardFlowFileQueue {
     }
 
     @Test
-    public void testLowestPrioritySwappedOutFirst() {
-        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
-        prioritizers.add(new FlowFileSizePrioritizer());
-        queue.setPriorities(prioritizers);
-
-        long maxSize = 20000;
-        for (int i = 1; i <= 20000; i++) {
-            queue.put(new MockFlowFileRecord(maxSize - i));
-        }
-
-        assertEquals(1, swapManager.swapOutCalledCount);
-        assertEquals(20000, queue.size().getObjectCount());
-
-        assertEquals(10000, 
queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
-        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, 
new HashSet<FlowFileRecord>());
-        assertEquals(10000, flowFiles.size());
-        for (int i = 0; i < 10000; i++) {
-            assertEquals(i, flowFiles.get(i).getSize());
-        }
-    }
-
-    @Test
     public void testSwapIn() {
         for (int i = 1; i <= 20000; i++) {
             queue.put(new MockFlowFileRecord());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index ef1a063..79be6ed 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -26,9 +26,11 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -78,6 +80,193 @@ public class TestSwappablePriorityQueue {
         queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, 
flowFileQueue, dropAction, "local");
     }
 
+    @Test
+    public void testPrioritizersBigQueue() {
+        final FlowFilePrioritizer iAttributePrioritizer = new 
FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
+        final int iterations = 29000;
+
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i);
+            flowFile.putAttributes(Collections.singletonMap("i", 
String.valueOf(i)));
+            queue.put(flowFile);
+        }
+
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i + iterations);
+            flowFile.putAttributes(Collections.singletonMap("i", 
String.valueOf(i + iterations)));
+
+            final FlowFileRecord polled = queue.poll(Collections.emptySet(), 
0L);
+            assertEquals(polled.getAttribute("i"), String.valueOf(i));
+
+            queue.put(flowFile);
+        }
+
+        // Make sure that the data is pulled from the queue and added back a 
couple of times.
+        // This will trigger swapping to occur, but also leave a lot of data 
in memory on the queue.
+        // This specifically tests the edge case where data is swapped out, 
and we want to make sure that
+        // when we read from the queue, that we swap the data back in before 
processing anything on the
+        // pending 'swap queue' internally.
+        repopulateQueue();
+        repopulateQueue();
+
+        int i=iterations;
+        FlowFileRecord flowFile;
+        while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) {
+            assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+            i++;
+        }
+    }
+
+
+    @Test
+    public void testOrderingWithCornerCases() {
+        final FlowFilePrioritizer iAttributePrioritizer = new 
FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
+
+        for (final int queueSize : new int[] {1, 9999, 10_000, 10_001, 19_999, 
20_000, 20_001}) {
+            System.out.println("Queue Size: " + queueSize);
+
+            for (int i=0; i < queueSize; i++) {
+                final MockFlowFile flowFile = new MockFlowFile(i);
+                flowFile.putAttributes(Collections.singletonMap("i", 
String.valueOf(i)));
+                queue.put(flowFile);
+            }
+
+            for (int i=0; i < queueSize; i++) {
+                final FlowFileRecord flowFile = 
queue.poll(Collections.emptySet(), 0);
+                assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+            }
+
+            assertNull(queue.poll(Collections.emptySet(), 0));
+        }
+    }
+
+    @Test
+    public void testPrioritizerWhenOutOfOrderDataEntersSwapQueue() {
+        final FlowFilePrioritizer iAttributePrioritizer = new 
FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
+
+        // Add 10,000 FlowFiles to the queue. These will all go to the active 
queue.
+        final int iterations = 10000;
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i);
+            flowFile.putAttributes(Collections.singletonMap("i", 
String.valueOf(i)));
+            queue.put(flowFile);
+        }
+
+        // Added 3 FlowFiles to the queue. These will all go to the Swap Queue.
+        for (final String iValue : new String[] {"10000", "-5", "8000"}) {
+            final MockFlowFile swapQueueFlowFile1 = new MockFlowFile(10_000);
+            swapQueueFlowFile1.putAttributes(Collections.singletonMap("i", 
iValue));
+            queue.put(swapQueueFlowFile1);
+        }
+
+        // The first 10,000 should be ordered. Then all FlowFiles on the swap 
queue should be transferred over, as a single unit, just as they would be in a 
swap file.
+        for (int i=0; i < iterations; i++) {
+            final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 
0);
+            assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+        }
+
+        for (final String iValue : new String[] {"-5", "8000", "10000"}) {
+            final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 
0);
+            assertEquals(iValue, flowFile.getAttribute("i"));
+        }
+    }
+
+    @Test
+    public void testPrioritizersDataAddedAfterSwapOccurs() {
+        final FlowFilePrioritizer iAttributePrioritizer = new 
FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
+        final int iterations = 29000;
+
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i);
+            flowFile.putAttributes(Collections.singletonMap("i", 
String.valueOf(i)));
+            queue.put(flowFile);
+        }
+
+        for (int i=0; i < iterations; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i + iterations);
+            flowFile.putAttributes(Collections.singletonMap("i", 
String.valueOf(i + iterations)));
+
+            final FlowFileRecord polled = queue.poll(Collections.emptySet(), 
0L);
+            assertEquals(polled.getAttribute("i"), String.valueOf(i));
+
+            queue.put(flowFile);
+        }
+
+        // Make sure that the data is pulled from the queue and added back a 
couple of times.
+        // This will trigger swapping to occur, but also leave a lot of data 
in memory on the queue.
+        // This specifically tests the edge case where data is swapped out, 
and we want to make sure that
+        // when we read from the queue, that we swap the data back in before 
processing anything on the
+        // pending 'swap queue' internally.
+        repopulateQueue();
+        repopulateQueue();
+
+        // Add enough data for another swap file to get created.
+        final int baseI = iterations * 2;
+        for (int i=0; i < 10_000; i++) {
+            final MockFlowFile flowFile = new MockFlowFile(i);
+            flowFile.putAttributes(Collections.singletonMap("i", 
String.valueOf(baseI + i)));
+            queue.put(flowFile);
+        }
+
+        repopulateQueue();
+
+        int i=iterations;
+        FlowFileRecord flowFile;
+        while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) {
+            assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+            i++;
+        }
+    }
+
+    private void repopulateQueue() {
+        final List<String> attrs = new ArrayList<>();
+        final List<FlowFileRecord> ffs = new ArrayList<>();
+        FlowFileRecord ff;
+        while ((ff = queue.poll(Collections.emptySet(), 0L)) != null) {
+            ffs.add(ff);
+            attrs.add(ff.getAttribute("i"));
+        }
+
+        ffs.forEach(queue::put);
+        System.out.println(StringUtils.join(attrs, ", "));
+    }
+
 
     @Test
     public void testSwapOutFailureLeavesCorrectQueueSize() {
@@ -241,13 +430,45 @@ public class TestSwappablePriorityQueue {
         assertEquals(20000, queue.size().getObjectCount());
 
         assertEquals(10000, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
-        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, 
new HashSet<FlowFileRecord>(), 500000);
-        assertEquals(10000, flowFiles.size());
+
+        // The first 10,000 FlowFiles to be added to the queue will be sorted 
by size (first 10,000 because that's the swap threshold, by size because of the 
prioritizer).
+        // The next 10,000 spill over to the swap queue. So we expect the 
first 10,000 FlowFiles to be size 10,000 to 20,000. Then the next 10,000 to be 
sized 0 to 9,999.
+        final List<FlowFileRecord> firstBatch = queue.poll(Integer.MAX_VALUE, 
Collections.emptySet(), 0);
+        assertEquals(10000, firstBatch.size());
         for (int i = 0; i < 10000; i++) {
-            assertEquals(i, flowFiles.get(i).getSize());
+            assertEquals(10_000 + i, firstBatch.get(i).getSize());
         }
+
+        final List<FlowFileRecord> secondBatch = queue.poll(Integer.MAX_VALUE, 
Collections.emptySet(), 0);
+        assertEquals(10000, secondBatch.size());
+        for (int i = 0; i < 10000; i++) {
+            assertEquals(i, secondBatch.get(i).getSize());
+        }
+
     }
 
+    @Test
+    public void testPrioritiesKeptIntactBeforeSwap() {
+        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+        prioritizers.add((o1, o2) -> Long.compare(o1.getSize(), o2.getSize()));
+        queue.setPriorities(prioritizers);
+
+        int maxSize = 9999;
+        for (int i = 1; i <= maxSize; i++) {
+            queue.put(new MockFlowFileRecord(maxSize - i));
+        }
+
+        assertEquals(0, swapManager.swapOutCalledCount);
+        assertEquals(maxSize, queue.size().getObjectCount());
+
+        assertEquals(9999, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+
+        FlowFileRecord flowFile;
+        int i=0;
+        while ((flowFile = queue.poll(Collections.emptySet(), 0L)) != null) {
+            assertEquals(i++, flowFile.getSize());
+        }
+    }
 
     @Test
     public void testSwapIn() {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
index c93bf7c..88470ff 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java
@@ -35,10 +35,8 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 
 public class StandaloneSwapFileIT extends FrameworkIntegrationTest {
-    @Test
+    @Test(timeout=60_000)
     public void testSwapOnRestart() throws ExecutionException, 
InterruptedException, IOException {
-        Thread.sleep(20000L);
-
         final ProcessorNode generator = 
createProcessorNode(GenerateProcessor.class);
         
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(),
 "60000"));
 

Reply via email to