This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 61df3034fe KAFKA-12657: Increase timeouts in Connect integration tests
(#12191)
61df3034fe is described below
commit 61df3034fece16180a479ed8758f383f6523eac3
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jun 2 04:21:07 2022 -0400
KAFKA-12657: Increase timeouts in Connect integration tests (#12191)
As an initial step to address the notoriously flaky BlockingConnectorTest
test suite, we can try increasing test timeouts.
This approach may not be sufficient, and even if it is, it's still
suboptimal. Although it may address flakiness on Jenkins, it will make genuine
failures harder to detect when testing local changes. Additionally, if the
workload on Jenkins continues to increase, we'll probably have to bump these
timeouts in the future again at some point.
Potential next steps, for this PR and beyond:
Stop leaking threads that block during test runs
Instead of artificially reducing the REST request timeout at the
beginning of every test, reduce it selectively right before issuing a REST
request that is expected to time out, and then immediately reset it.
Eliminate artificial reduction of the REST request timeout entirely, as
it may be negatively impacting other Connect integration tests that are being
run concurrently.
Test repeatedly on Jenkins, ideally at least 50 times
Gather information on the number of CPU cores available to each Jenkins
node and the distribution of how many threads are allocated over a given time
period (maybe a day?); this is especially relevant since local testing
indicates that these tests all do much better when parallelism is reduced,
which shouldn't be too surprising considering that each Connect integration
test spins up separate threads for at least one Zookeeper node, one Kafka
broker, one Connect worker, and usually [...]
I'd like to test these changes as a first step before investigating any of
the above (except maybe items 1 and 2, which should be fairly straightforward).
To trigger new runs I plan on pushing empty commits or, if those do not trigger
new Jenkins runs, dummy commits. If this is objectionable let me know and
hopefully we can find a suitable alternative.
Reviewers: Kvicii <[email protected]>, Bruno Cadonna <[email protected]>
---
.../connect/integration/BlockingConnectorTest.java | 17 +++++++----------
.../util/clusters/EmbeddedConnectClusterAssertions.java | 4 ++--
2 files changed, 9 insertions(+), 12 deletions(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 571cfbb6a8..74b089892b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
@@ -78,8 +79,8 @@ public class BlockingConnectorTest {
private static final String NORMAL_CONNECTOR_NAME = "normal-connector";
private static final String TEST_TOPIC = "normal-topic";
private static final int NUM_RECORDS_PRODUCED = 100;
- private static final long CONNECT_WORKER_STARTUP_TIMEOUT =
TimeUnit.SECONDS.toMillis(60);
- private static final long RECORD_TRANSFER_DURATION_MS =
TimeUnit.SECONDS.toMillis(30);
+ private static final long CONNECTOR_BLOCK_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(60);
+ private static final long RECORD_TRANSFER_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(60);
private static final long REST_REQUEST_TIMEOUT =
Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS * 2;
private static final String CONNECTOR_INITIALIZE = "Connector::initialize";
@@ -133,7 +134,7 @@ public class BlockingConnectorTest {
// if the worker is still getting on its feet.
waitForCondition(
() ->
connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus()
== 404,
- CONNECT_WORKER_STARTUP_TIMEOUT,
+ EmbeddedConnectClusterAssertions.WORKER_SETUP_DURATION_MS,
"Worker did not complete startup in time"
);
}
@@ -329,8 +330,8 @@ public class BlockingConnectorTest {
private void verifyNormalConnector() throws InterruptedException {
waitForConnectorStart(NORMAL_CONNECTOR_NAME);
- normalConnectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
- normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
+ normalConnectorHandle.awaitRecords(RECORD_TRANSFER_TIMEOUT_MS);
+ normalConnectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
}
private static class Block {
@@ -360,7 +361,7 @@ public class BlockingConnectorTest {
}
log.debug("Waiting for connector to block");
- if (!blockLatch.await(60, TimeUnit.SECONDS)) {
+ if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out waiting for connector to
block.");
}
log.debug("Connector should now be blocked");
@@ -393,10 +394,6 @@ public class BlockingConnectorTest {
}
}
- public Map<String, String> taskConfig() {
- return Collections.singletonMap(BLOCK_CONFIG, block);
- }
-
public void maybeBlockOn(String block) {
if (block.equals(this.block)) {
log.info("Will block on {}", block);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index edd99c8042..44b12eb6e9 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -44,9 +44,9 @@ import static
org.apache.kafka.test.TestUtils.waitForCondition;
public class EmbeddedConnectClusterAssertions {
private static final Logger log =
LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
- public static final long WORKER_SETUP_DURATION_MS =
TimeUnit.SECONDS.toMillis(60);
+ public static final long WORKER_SETUP_DURATION_MS =
TimeUnit.MINUTES.toMillis(5);
public static final long VALIDATION_DURATION_MS =
TimeUnit.SECONDS.toMillis(30);
- public static final long CONNECTOR_SETUP_DURATION_MS =
TimeUnit.SECONDS.toMillis(30);
+ public static final long CONNECTOR_SETUP_DURATION_MS =
TimeUnit.MINUTES.toMillis(2);
private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS =
TimeUnit.SECONDS.toMillis(60);
private final EmbeddedConnectCluster connect;