This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b4066fbd40 Fix flaky quota integration tests (#14602)
b4066fbd40 is described below
commit b4066fbd40cce1e035a70968382fc24b3b7f4427
Author: Bolek Ziobrowski <[email protected]>
AuthorDate: Tue Dec 10 23:06:35 2024 +0100
Fix flaky quota integration tests (#14602)
---
.../tests/QueryQuotaClusterIntegrationTest.java | 53 ++++++++++++++--------
1 file changed, 33 insertions(+), 20 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index 8ac736e507..a40cbdf290 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -234,30 +235,43 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
* Then runs the query load with double the max rate and expects queries to
fail due to quota breach.
* @param maxRate max rate allowed by the quota
*/
- void testQueryRate(float maxRate)
- throws Exception {
+ void testQueryRate(int maxRate) {
verifyQuotaUpdate(maxRate);
runQueries(maxRate, false);
//increase the qps and some of the queries should be throttled.
runQueries(maxRate * 2, true);
}
- void testQueryRateOnBroker(float maxRate)
- throws Exception {
+ void testQueryRateOnBroker(float maxRate) {
verifyQuotaUpdate(maxRate);
runQueriesOnBroker(maxRate, false);
//increase the qps and some of the queries should be throttled.
runQueriesOnBroker(maxRate * 2, true);
}
+ // adjust sleep time depending on deadline and number of iterations left
+ private static void sleep(long deadline, double iterationsLeft) {
+ long time = System.currentTimeMillis();
+ long sleepDeadline = time + (long) Math.max(Math.ceil((deadline - time) /
iterationsLeft), 0);
+
+ while (time < sleepDeadline) {
+ try {
+ Thread.sleep(sleepDeadline - time);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ time = System.currentTimeMillis();
+ }
+ }
+
// try to keep the qps below 50 to ensure that the time lost between 2 query
runs on top of the sleepMillis
// is not comparable to sleepMillis, else the actual qps would end up being
much lower than required qps
- private void runQueries(double qps, boolean shouldFail)
- throws Exception {
+ private void runQueries(int qps, boolean shouldFail) {
int failCount = 0;
- long sleepMillis = (long) (1000 / qps);
- Thread.sleep(sleepMillis);
- for (int i = 0; i < qps * 2; i++) {
+ long deadline = System.currentTimeMillis() + 1000;
+
+ for (int i = 0; i < qps; i++) {
+ sleep(deadline, qps - i);
ResultSetGroup resultSetGroup =
_pinotConnection.execute("SET applicationName='default'; SELECT
COUNT(*) FROM " + getTableName());
for (PinotClientException exception : resultSetGroup.getExceptions()) {
@@ -266,12 +280,12 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
break;
}
}
- Thread.sleep(sleepMillis);
}
+
if (shouldFail) {
- assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps);
+ assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps);
} else {
- assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps);
+ Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " +
qps);
}
}
@@ -316,12 +330,12 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
return _pinotClientTransport.executeQuery(_brokerHostPort, query);
}
- private void runQueriesOnBroker(double qps, boolean shouldFail)
- throws Exception {
+ private void runQueriesOnBroker(float qps, boolean shouldFail) {
int failCount = 0;
- long sleepMillis = (long) (1000 / qps);
- Thread.sleep(sleepMillis);
- for (int i = 0; i < qps * 2; i++) {
+ long deadline = System.currentTimeMillis() + 1000;
+
+ for (int i = 0; i < qps; i++) {
+ sleep(deadline, qps - i);
BrokerResponse resultSetGroup =
executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*)
FROM " + getTableName());
for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements();
it.hasNext(); ) {
@@ -331,13 +345,12 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
break;
}
}
- Thread.sleep(sleepMillis);
}
if (shouldFail) {
- assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps);
+ assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps);
} else {
- assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps);
+ Assert.assertEquals(failCount, 0, "Expected 0 failures for qps: " + qps);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]