This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 24ab8859749 NIFI-15725 Improved stability of TestControlRate and
TestConnectWebSocket (#11019)
24ab8859749 is described below
commit 24ab88597497f6df4899ea6bea77b35269001fbe
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Mar 18 18:49:03 2026 +0100
NIFI-15725 Improved stability of TestControlRate and TestConnectWebSocket
(#11019)
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/util/timebuffer/LongEntityAccess.java | 18 +++++++++++++++---
.../apache/nifi/util/timebuffer/TimestampedLong.java | 7 ++++++-
.../apache/nifi/processors/standard/ControlRate.java | 4 ++--
.../nifi/processors/standard/TestControlRate.java | 12 ++++++++++++
.../processors/websocket/TestConnectWebSocket.java | 8 +++++++-
5 files changed, 42 insertions(+), 7 deletions(-)
diff --git
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
index 193abc60e20..812b3de5d40 100644
---
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
+++
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
@@ -16,24 +16,36 @@
*/
package org.apache.nifi.util.timebuffer;
+import java.util.function.LongSupplier;
+
public class LongEntityAccess implements EntityAccess<TimestampedLong> {
+ private final LongSupplier currentTimeSupplier;
+
+ public LongEntityAccess() {
+ this(System::currentTimeMillis);
+ }
+
+ public LongEntityAccess(final LongSupplier currentTimeSupplier) {
+ this.currentTimeSupplier = currentTimeSupplier;
+ }
+
@Override
public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong
toAdd) {
if (oldValue == null && toAdd == null) {
- return new TimestampedLong(0L);
+ return new TimestampedLong(0L, currentTimeSupplier.getAsLong());
} else if (oldValue == null) {
return toAdd;
} else if (toAdd == null) {
return oldValue;
}
- return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+ return new TimestampedLong(oldValue.getValue() + toAdd.getValue(),
currentTimeSupplier.getAsLong());
}
@Override
public TimestampedLong createNew() {
- return new TimestampedLong(0L);
+ return new TimestampedLong(0L, currentTimeSupplier.getAsLong());
}
@Override
diff --git
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
index 07d31ea5aa9..8b316fea5a4 100644
---
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
+++
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
@@ -19,10 +19,15 @@ package org.apache.nifi.util.timebuffer;
public class TimestampedLong {
private final Long value;
- private final long timestamp = System.currentTimeMillis();
+ private final long timestamp;
public TimestampedLong(final Long value) {
+ this(value, System.currentTimeMillis());
+ }
+
+ public TimestampedLong(final Long value, final long timestamp) {
this.value = value;
+ this.timestamp = timestamp;
}
public Long getValue() {
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 274b1002dd2..1b65bf4becf 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -552,7 +552,7 @@ public class ControlRate extends AbstractProcessor {
private Throttle(final int timePeriod, final TimeUnit unit, final
ComponentLog logger, final LongSupplier currentTimeSupplier) {
this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod,
unit);
- this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new
LongEntityAccess(), currentTimeSupplier);
+ this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new
LongEntityAccess(currentTimeSupplier), currentTimeSupplier);
this.logger = logger;
this.currentTimeSupplier = currentTimeSupplier;
}
@@ -600,7 +600,7 @@ public class ControlRate extends AbstractProcessor {
sum == null ? 0 : sum.getValue(), sum == null ? 0 :
sum.getTimestamp(), value);
}
- final long transferred = timedBuffer.add(new
TimestampedLong(value)).getValue();
+ final long transferred = timedBuffer.add(new
TimestampedLong(value, now)).getValue();
if (transferred > maxRateValue) {
final long amountOver = transferred - maxRateValue;
// determine how long it should take to transfer 'amountOver'
and 'penalize' the Throttle for that long
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index e63a085a8d7..355bda478d0 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -434,6 +434,12 @@ public class TestControlRate {
incrementCurrentTime(2000);
runner.run(1, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+
+ incrementCurrentTime();
+ runner.run(1, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
@@ -465,6 +471,12 @@ public class TestControlRate {
// we have sent 2 flowfile and after 1 second, we should be able to
send more, now limited by flowfile count
incrementCurrentTime(1500);
runner.run(1, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 7);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+
+ incrementCurrentTime();
+ runner.run(1, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 8);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
diff --git
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
index 024cf2aed8d..0222eec6c03 100644
---
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
+++
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
@@ -200,7 +200,7 @@ class TestConnectWebSocket extends TestListenWebSocket {
}
@Test
- void testDynamicUrlsParsedFromFlowFileAndAbleToConnectAndDisconnect()
throws InitializationException {
+ void testDynamicUrlsParsedFromFlowFileAndAbleToConnectAndDisconnect()
throws Exception {
// Start websocket server
final TestRunner webSocketListener =
TestRunners.newTestRunner(ListenWebSocket.class);
@@ -243,6 +243,12 @@ class TestConnectWebSocket extends TestListenWebSocket {
webSocketListener.disableControllerService(server);
+ final long disconnectTimeout = System.currentTimeMillis() + 5000;
+ while
(runner.getFlowFilesForRelationship(ConnectWebSocket.REL_DISCONNECTED).isEmpty()
+ && System.currentTimeMillis() < disconnectTimeout) {
+ Thread.sleep(50);
+ }
+
final List<MockFlowFile> flowFilesForDisconnectedRelationship =
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_DISCONNECTED);
assertEquals(1, flowFilesForDisconnectedRelationship.size());