This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 24ab8859749 NIFI-15725 Improved stability of TestControlRate and 
TestConnectWebSocket (#11019)
24ab8859749 is described below

commit 24ab88597497f6df4899ea6bea77b35269001fbe
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Mar 18 18:49:03 2026 +0100

    NIFI-15725 Improved stability of TestControlRate and TestConnectWebSocket 
(#11019)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/util/timebuffer/LongEntityAccess.java  | 18 +++++++++++++++---
 .../apache/nifi/util/timebuffer/TimestampedLong.java   |  7 ++++++-
 .../apache/nifi/processors/standard/ControlRate.java   |  4 ++--
 .../nifi/processors/standard/TestControlRate.java      | 12 ++++++++++++
 .../processors/websocket/TestConnectWebSocket.java     |  8 +++++++-
 5 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
index 193abc60e20..812b3de5d40 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
@@ -16,24 +16,36 @@
  */
 package org.apache.nifi.util.timebuffer;
 
+import java.util.function.LongSupplier;
+
 public class LongEntityAccess implements EntityAccess<TimestampedLong> {
 
+    private final LongSupplier currentTimeSupplier;
+
+    public LongEntityAccess() {
+        this(System::currentTimeMillis);
+    }
+
+    public LongEntityAccess(final LongSupplier currentTimeSupplier) {
+        this.currentTimeSupplier = currentTimeSupplier;
+    }
+
     @Override
     public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong 
toAdd) {
         if (oldValue == null && toAdd == null) {
-            return new TimestampedLong(0L);
+            return new TimestampedLong(0L, currentTimeSupplier.getAsLong());
         } else if (oldValue == null) {
             return toAdd;
         } else if (toAdd == null) {
             return oldValue;
         }
 
-        return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+        return new TimestampedLong(oldValue.getValue() + toAdd.getValue(), 
currentTimeSupplier.getAsLong());
     }
 
     @Override
     public TimestampedLong createNew() {
-        return new TimestampedLong(0L);
+        return new TimestampedLong(0L, currentTimeSupplier.getAsLong());
     }
 
     @Override
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
index 07d31ea5aa9..8b316fea5a4 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
@@ -19,10 +19,15 @@ package org.apache.nifi.util.timebuffer;
 public class TimestampedLong {
 
     private final Long value;
-    private final long timestamp = System.currentTimeMillis();
+    private final long timestamp;
 
     public TimestampedLong(final Long value) {
+        this(value, System.currentTimeMillis());
+    }
+
+    public TimestampedLong(final Long value, final long timestamp) {
         this.value = value;
+        this.timestamp = timestamp;
     }
 
     public Long getValue() {
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 274b1002dd2..1b65bf4becf 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -552,7 +552,7 @@ public class ControlRate extends AbstractProcessor {
 
         private Throttle(final int timePeriod, final TimeUnit unit, final 
ComponentLog logger, final LongSupplier currentTimeSupplier) {
             this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, 
unit);
-            this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new 
LongEntityAccess(), currentTimeSupplier);
+            this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new 
LongEntityAccess(currentTimeSupplier), currentTimeSupplier);
             this.logger = logger;
             this.currentTimeSupplier = currentTimeSupplier;
         }
@@ -600,7 +600,7 @@ public class ControlRate extends AbstractProcessor {
                         sum == null ? 0 : sum.getValue(), sum == null ? 0 : 
sum.getTimestamp(), value);
             }
 
-            final long transferred = timedBuffer.add(new 
TimestampedLong(value)).getValue();
+            final long transferred = timedBuffer.add(new 
TimestampedLong(value, now)).getValue();
             if (transferred > maxRateValue) {
                 final long amountOver = transferred - maxRateValue;
                 // determine how long it should take to transfer 'amountOver' 
and 'penalize' the Throttle for that long
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index e63a085a8d7..355bda478d0 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -434,6 +434,12 @@ public class TestControlRate {
 
         incrementCurrentTime(2000);
         runner.run(1, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        incrementCurrentTime();
+        runner.run(1, false);
         runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueEmpty();
@@ -465,6 +471,12 @@ public class TestControlRate {
         // we have sent 2 flowfile and after 1 second, we should be able to 
send more, now limited by flowfile count
         incrementCurrentTime(1500);
         runner.run(1, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 7);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        incrementCurrentTime();
+        runner.run(1, false);
         runner.assertTransferCount(ControlRate.REL_SUCCESS, 8);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueEmpty();
diff --git 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
index 024cf2aed8d..0222eec6c03 100644
--- 
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
+++ 
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
@@ -200,7 +200,7 @@ class TestConnectWebSocket extends TestListenWebSocket {
     }
 
     @Test
-    void testDynamicUrlsParsedFromFlowFileAndAbleToConnectAndDisconnect() 
throws InitializationException {
+    void testDynamicUrlsParsedFromFlowFileAndAbleToConnectAndDisconnect() 
throws Exception {
         // Start websocket server
         final TestRunner webSocketListener = 
TestRunners.newTestRunner(ListenWebSocket.class);
 
@@ -243,6 +243,12 @@ class TestConnectWebSocket extends TestListenWebSocket {
 
         webSocketListener.disableControllerService(server);
 
+        final long disconnectTimeout = System.currentTimeMillis() + 5000;
+        while 
(runner.getFlowFilesForRelationship(ConnectWebSocket.REL_DISCONNECTED).isEmpty()
+                && System.currentTimeMillis() < disconnectTimeout) {
+            Thread.sleep(50);
+        }
+
         final List<MockFlowFile> flowFilesForDisconnectedRelationship = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_DISCONNECTED);
         assertEquals(1, flowFilesForDisconnectedRelationship.size());
 

Reply via email to