This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4c9eeef5b2 MINOR: add timeouts to streams integration tests (#12216)
4c9eeef5b2 is described below

commit 4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue May 31 14:22:13 2022 -0700

    MINOR: add timeouts to streams integration tests (#12216)
    
    Reviewers: David Arthur <[email protected]>
---
 .../kafka/connect/integration/ErrorHandlingIntegrationTest.java     | 6 ++++--
 .../src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java    | 3 +++
 streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java    | 4 ++++
 .../src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java  | 4 ++++
 .../src/test/java/org/apache/kafka/streams/StreamsConfigTest.java   | 5 ++++-
 streams/src/test/java/org/apache/kafka/streams/TopologyTest.java    | 4 ++++
 .../kafka/streams/integration/AbstractResetIntegrationTest.java     | 4 ++++
 .../kafka/streams/integration/AdjustStreamThreadCountTest.java      | 4 ++++
 .../kafka/streams/integration/ConsistencyVectorIntegrationTest.java | 4 +++-
 .../streams/integration/EOSUncleanShutdownIntegrationTest.java      | 4 ++++
 .../kafka/streams/integration/EmitOnChangeIntegrationTest.java      | 3 +++
 .../org/apache/kafka/streams/integration/EosIntegrationTest.java    | 4 ++++
 .../kafka/streams/integration/EosV2UpgradeIntegrationTest.java      | 4 ++++
 .../streams/integration/FineGrainedAutoResetIntegrationTest.java    | 5 ++++-
 .../kafka/streams/integration/GlobalKTableEOSIntegrationTest.java   | 4 ++++
 .../kafka/streams/integration/GlobalKTableIntegrationTest.java      | 4 ++++
 .../kafka/streams/integration/GlobalThreadShutDownOrderTest.java    | 4 +++-
 .../integration/HighAvailabilityTaskAssignorIntegrationTest.java    | 4 ++++
 .../org/apache/kafka/streams/integration/IQv2IntegrationTest.java   | 4 +++-
 .../apache/kafka/streams/integration/IQv2StoreIntegrationTest.java  | 4 ++++
 .../kafka/streams/integration/InternalTopicIntegrationTest.java     | 5 +++++
 .../apache/kafka/streams/integration/JoinStoreIntegrationTest.java  | 3 +++
 .../integration/JoinWithIncompleteMetadataIntegrationTest.java      | 4 ++++
 .../streams/integration/KStreamAggregationDedupIntegrationTest.java | 4 ++++
 .../streams/integration/KStreamAggregationIntegrationTest.java      | 4 ++++
 .../streams/integration/KStreamRepartitionIntegrationTest.java      | 4 ++++
 .../kafka/streams/integration/KStreamTransformIntegrationTest.java  | 5 ++++-
 .../kafka/streams/integration/KTableEfficientRangeQueryTest.java    | 3 +++
 ...leKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java | 4 ++++
 .../KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java        | 4 ++++
 .../integration/KTableKTableForeignKeyJoinDistributedTest.java      | 3 +++
 .../integration/KTableKTableForeignKeyJoinIntegrationTest.java      | 4 +++-
 .../KTableKTableForeignKeyJoinMaterializationIntegrationTest.java   | 4 +++-
 .../integration/KTableSourceTopicRestartIntegrationTest.java        | 3 +++
 .../apache/kafka/streams/integration/LagFetchIntegrationTest.java   | 4 +++-
 .../apache/kafka/streams/integration/MetricsIntegrationTest.java    | 4 +++-
 .../kafka/streams/integration/MetricsReporterIntegrationTest.java   | 4 +++-
 .../kafka/streams/integration/NamedTopologyIntegrationTest.java     | 4 ++++
 .../kafka/streams/integration/OptimizedKTableIntegrationTest.java   | 3 +++
 .../kafka/streams/integration/PositionRestartIntegrationTest.java   | 5 ++++-
 .../streams/integration/PurgeRepartitionTopicIntegrationTest.java   | 5 ++++-
 .../kafka/streams/integration/QueryableStateIntegrationTest.java    | 3 +++
 .../kafka/streams/integration/RackAwarenessIntegrationTest.java     | 3 +++
 .../apache/kafka/streams/integration/RangeQueryIntegrationTest.java | 3 +++
 .../kafka/streams/integration/RegexSourceIntegrationTest.java       | 4 ++++
 .../org/apache/kafka/streams/integration/ResetIntegrationTest.java  | 5 ++++-
 .../streams/integration/ResetPartitionTimeIntegrationTest.java      | 3 +++
 .../apache/kafka/streams/integration/RestoreIntegrationTest.java    | 3 +++
 .../kafka/streams/integration/RocksDBMetricsIntegrationTest.java    | 4 +++-
 .../streams/integration/SlidingWindowedKStreamIntegrationTest.java  | 4 ++++
 .../kafka/streams/integration/SmokeTestDriverIntegrationTest.java   | 4 ++++
 .../streams/integration/StandbyTaskCreationIntegrationTest.java     | 4 +++-
 .../kafka/streams/integration/StandbyTaskEOSIntegrationTest.java    | 4 +++-
 .../kafka/streams/integration/StateDirectoryIntegrationTest.java    | 3 +++
 .../kafka/streams/integration/StateRestorationIntegrationTest.java  | 5 +++++
 .../apache/kafka/streams/integration/StoreQueryIntegrationTest.java | 3 +++
 .../kafka/streams/integration/StreamStreamJoinIntegrationTest.java  | 4 ++++
 .../kafka/streams/integration/StreamTableJoinIntegrationTest.java   | 4 ++++
 .../StreamTableJoinTopologyOptimizationIntegrationTest.java         | 3 +++
 .../integration/StreamsUncaughtExceptionHandlerIntegrationTest.java | 3 +++
 .../streams/integration/StreamsUpgradeTestIntegrationTest.java      | 4 ++++
 .../streams/integration/SuppressionDurabilityIntegrationTest.java   | 3 +++
 .../kafka/streams/integration/SuppressionIntegrationTest.java       | 4 ++++
 .../kafka/streams/integration/TableTableJoinIntegrationTest.java    | 4 ++++
 .../kafka/streams/integration/TaskAssignorIntegrationTest.java      | 3 +++
 .../kafka/streams/integration/TaskMetadataIntegrationTest.java      | 3 +++
 .../streams/integration/TimeWindowedKStreamIntegrationTest.java     | 5 +++++
 .../test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java  | 1 -
 .../src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java | 4 ++++
 .../java/org/apache/kafka/streams/tests/SystemTestUtilTest.java     | 4 ++++
 .../java/org/apache/kafka/streams/tools/StreamsResetterTest.java    | 4 ++++
 .../test/scala/org/apache/kafka/streams/scala/TopologyTest.scala    | 2 +-
 72 files changed, 254 insertions(+), 21 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index b3dd9a097e..5bc5fcdbd2 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -29,8 +29,10 @@ import 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,9 +67,9 @@ import static org.junit.Assert.fail;
  */
 @Category(IntegrationTest.class)
 public class ErrorHandlingIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Logger log = 
LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
-
     private static final int NUM_WORKERS = 1;
     private static final String DLQ_TOPIC = "my-connector-errors";
     private static final String CONNECTOR_NAME = "error-conn";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 3e37e097cd..95574a16ca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -74,6 +74,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
@@ -124,6 +125,8 @@ import static org.junit.Assert.fail;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({KafkaStreams.class, StreamThread.class, ClientMetrics.class, 
StreamsConfigUtils.class})
 public class KafkaStreamsTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     private static final int NUM_THREADS = 2;
     private final static String APPLICATION_ID = "appId";
diff --git a/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
index 24f7d5d617..01e09746b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.streams;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class KeyValueTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     @Test
     public void shouldHaveSameEqualsAndHashCode() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 9b498df501..d7553c85b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -52,7 +52,9 @@ import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.NoopValueTransformer;
 import org.apache.kafka.test.NoopValueTransformerWithKey;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -77,6 +79,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class StreamsBuilderTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     private static final String STREAM_TOPIC = "stream-topic";
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 98a6050a1a..aaf6858707 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -34,7 +34,9 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
@@ -78,7 +80,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class StreamsConfigTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private final Properties props = new Properties();
     private StreamsConfig streamsConfig;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 726d8b29a5..8d09cb3ee7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -51,7 +51,9 @@ import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.easymock.EasyMock;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -71,6 +73,8 @@ import static org.junit.Assert.fail;
 
 @SuppressWarnings("deprecation")
 public class TopologyTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     private final StoreBuilder<MockKeyValueStore> storeBuilder = 
EasyMock.createNiceMock(StoreBuilder.class);
     private final KeyValueStoreBuilder<?, ?> globalStoreBuilder = 
EasyMock.createNiceMock(KeyValueStoreBuilder.class);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 70bcef9cd6..83b843a6a5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -69,6 +70,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category({IntegrationTest.class})
 public abstract class AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     static EmbeddedKafkaCluster cluster;
 
     private static MockTime mockTime;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index f996238fe1..e70f0a9861 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -44,6 +44,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -75,6 +77,8 @@ import static org.junit.Assert.fail;
 
 @Category(IntegrationTest.class)
 public class AdjustStreamThreadCountTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
index 0b4178b7ad..78629cc652 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -71,7 +72,8 @@ import static org.hamcrest.Matchers.is;
 
 @Category({IntegrationTest.class})
 public class ConsistencyVectorIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static int port = 0;
     private static final String INPUT_TOPIC_NAME = "input-topic";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
index d3e991d51c..718f162a18 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
@@ -36,9 +36,11 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -66,6 +68,8 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 @Category(IntegrationTest.class)
 public class EOSUncleanShutdownIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     @SuppressWarnings("deprecation")
     @Parameterized.Parameters(name = "{0}")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index e0fcd4daa3..3d2bcb8055 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -53,6 +54,8 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
 
 @Category(IntegrationTest.class)
 public class EmitOnChangeIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 82765f73ea..7fd07b01ff 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -56,8 +56,10 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
@@ -99,6 +101,8 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
 public class EosIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Logger LOG = 
LoggerFactory.getLogger(EosIntegrationTest.class);
     private static final int NUM_BROKERS = 3;
     private static final int MAX_POLL_INTERVAL_MS = 5 * 1000;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
index 09f919ede6..aacb9e0483 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
@@ -56,8 +56,10 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -90,6 +92,8 @@ import static org.junit.Assert.assertFalse;
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
 public class EosV2UpgradeIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     @Parameterized.Parameters(name = "{0}")
     public static Collection<Boolean[]> data() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index c94066c397..b7b0b5a714 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -44,8 +44,10 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -64,7 +66,8 @@ import static org.junit.Assert.fail;
 
 @Category({IntegrationTest.class})
 public class FineGrainedAutoResetIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 2a1fedfd5e..b2272d0ec2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -55,6 +55,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -73,6 +74,9 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
 public class GlobalKTableEOSIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     private static final int NUM_BROKERS = 1;
     private static final Properties BROKER_CONFIG;
     static {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 4cdd172721..64a83dff9d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -52,6 +52,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -69,6 +70,9 @@ import static org.junit.Assert.assertNotNull;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 31658bbf3d..825ec13028 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -71,7 +72,8 @@ import static org.junit.Assert.assertEquals;
  */
 @Category({IntegrationTest.class})
 public class GlobalThreadShutDownOrderTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final Properties BROKER_CONFIG;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index 2b67a0f0fd..d712dfa3fc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -74,6 +75,9 @@ import static org.hamcrest.Matchers.is;
 
 @Category(IntegrationTest.class)
 public class HighAvailabilityTaskAssignorIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
     @BeforeClass
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 5ba218c65e..5d31398e38 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -63,6 +63,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -89,7 +90,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 @Category({IntegrationTest.class})
 public class IQv2IntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
     private static int port = 0;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 1c828c7453..813626d9ec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -71,8 +71,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
@@ -117,6 +119,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 @Category({IntegrationTest.class})
 @RunWith(value = Parameterized.class)
 public class IQv2StoreIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IQv2StoreIntegrationTest.class);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 3abe088216..6e5ec644a4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -48,8 +48,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -74,6 +76,9 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("deprecation")
 @Category({IntegrationTest.class})
 public class InternalTopicIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
     @BeforeClass
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index 04d3f7dd8b..cbe32e4a25 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -45,6 +45,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -60,6 +61,8 @@ import static org.junit.Assert.assertThrows;
 @SuppressWarnings("deprecation")
 @Category({IntegrationTest.class})
 public class JoinStoreIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 1f6152da55..17cbf5f250 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -38,11 +38,15 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
 
 import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
 public class JoinWithIncompleteMetadataIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
     @BeforeClass
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 79a697553b..99f03df22b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -51,6 +51,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -67,6 +68,9 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
 @Category({IntegrationTest.class})
 @SuppressWarnings("deprecation")
 public class KStreamAggregationDedupIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     private static final int NUM_BROKERS = 1;
     private static final long COMMIT_INTERVAL_MS = 300L;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 3990c67a7b..9ec0806cc2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -71,6 +71,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -101,6 +102,9 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings({"unchecked", "deprecation"})
 @Category({IntegrationTest.class})
 public class KStreamAggregationIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 4730f5a839..1cff0fc201 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
@@ -83,6 +84,9 @@ import static org.junit.Assert.assertTrue;
 @Category({IntegrationTest.class})
 @SuppressWarnings("deprecation")
 public class KStreamRepartitionIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
index 51e0cbc0d6..d916faa9d7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
@@ -39,8 +39,10 @@ import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -55,7 +57,8 @@ import static org.hamcrest.core.IsEqual.equalTo;
 @SuppressWarnings("unchecked")
 @Category({IntegrationTest.class})
 public class KStreamTransformIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private StreamsBuilder builder;
     private final String topic = "stream";
     private final String stateStoreName = "myTransformState";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
index b0564ba16a..746de14f21 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
@@ -40,6 +40,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -61,6 +62,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 @RunWith(Parameterized.class)
 public class KTableEfficientRangeQueryTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private enum StoreType { InMemory, RocksDB, Timed };
     private static final String TABLE_NAME = "mytable";
     private static final int DATA_SIZE = 5;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index e0bed5bf68..d5f411ef1f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -57,13 +57,17 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import kafka.utils.MockTime;
+import org.junit.rules.Timeout;
 
 @Category({IntegrationTest.class})
 public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private final static int NUM_BROKERS = 1;
 
     public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index ed48cf8dbf..82f6ffae18 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -46,8 +46,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -64,6 +66,8 @@ import static org.junit.Assert.assertEquals;
 
 @Category({IntegrationTest.class})
 public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private final static int NUM_BROKERS = 1;
 
     public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index af952bda9b..c95b37ae85 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -42,6 +42,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -58,6 +59,8 @@ import static org.junit.Assert.assertEquals;
 
 @Category({IntegrationTest.class})
 public class KTableKTableForeignKeyJoinDistributedTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final String LEFT_TABLE = "left_table";
     private static final String RIGHT_TABLE = "right_table";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 60104c4755..176d27a2b3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -38,6 +38,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -60,7 +61,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 @RunWith(Parameterized.class)
 public class KTableKTableForeignKeyJoinIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final String LEFT_TABLE = "left_table";
     private static final String RIGHT_TABLE = "right_table";
     private static final String OUTPUT = "output-topic";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index 778f507cf2..5b63a655e2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -37,6 +37,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -58,7 +59,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 @RunWith(Parameterized.class)
 public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final String LEFT_TABLE = "left_table";
     private static final String RIGHT_TABLE = "right_table";
     private static final String OUTPUT = "output-topic";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 8d4ac2c6e1..b5778835f8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -43,6 +43,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -55,6 +56,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 @Category({IntegrationTest.class})
 public class KTableSourceTopicRestartIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 3;
     private static final String SOURCE_TOPIC = "source-topic";
     private static final Properties PRODUCER_CONFIG = new Properties();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index 6a024966d9..3b7251d6da 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +77,8 @@ import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
 public class LagFetchIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
     @BeforeClass
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index b2c2e4d5e7..0da0ac9eef 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -52,6 +52,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -68,7 +69,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @Category({IntegrationTest.class})
 @SuppressWarnings("deprecation")
 public class MetricsIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final int NUM_THREADS = 2;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
index a7c925ad5f..e07a888690 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
@@ -35,6 +35,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -48,7 +49,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category({IntegrationTest.class})
 public class MetricsReporterIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 75bf3a1c3b..30de1d26cb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -71,6 +71,8 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Iterator;
@@ -102,6 +104,8 @@ import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 
 public class NamedTopologyIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
     
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index f6f045dea9..2e709af0b1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -63,11 +63,14 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Category(IntegrationTest.class)
 public class OptimizedKTableIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
     private static final int NUM_BROKERS = 1;
     private static int port = 0;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
index db1f86eb35..1c40b1ea6f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
@@ -67,8 +67,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
@@ -100,7 +102,8 @@ import static org.hamcrest.Matchers.is;
 @Category({IntegrationTest.class})
 @RunWith(value = Parameterized.class)
 public class PositionRestartIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Logger LOG = 
LoggerFactory.getLogger(PositionRestartIntegrationTest.class);
     private static final long SEED = new Random().nextLong();
     private static final int NUM_BROKERS = 1;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 37d8743521..26720a0021 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -41,8 +41,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -55,7 +57,8 @@ import java.util.Set;
 
 @Category({IntegrationTest.class})
 public class PurgeRepartitionTopicIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     private static final String INPUT_TOPIC = "input-stream";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 52a896668e..82409af50f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -69,6 +69,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,6 +123,8 @@ import static org.junit.Assert.assertThrows;
 @Category({IntegrationTest.class})
 @SuppressWarnings("deprecation")
 public class QueryableStateIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Logger log = 
LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
 
     private static final long DEFAULT_TIMEOUT_MS = 120 * 1000;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
index f4afbe8012..54736c44a2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -61,6 +62,8 @@ import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
 public class RackAwarenessIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
index 1c22bfca37..c67696b2ca 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -68,6 +69,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
 public class RangeQueryIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
     private static final Properties STREAMS_CONFIG = new Properties();
     private static final String APP_ID = "range-query-integration-test";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 99cb358b6e..1a22c90fcf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -51,9 +51,11 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -81,6 +83,8 @@ import static org.hamcrest.Matchers.greaterThan;
  */
 @Category({IntegrationTest.class})
 public class RegexSourceIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 5c236e6ba8..204a66d148 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -32,8 +32,10 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -55,7 +57,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
  */
 @Category({IntegrationTest.class})
 public class ResetIntegrationTest extends AbstractResetIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
 
     public static final EmbeddedKafkaCluster CLUSTER;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
index c3e7c283d0..7fe905ae7d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
@@ -41,6 +41,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -65,6 +66,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
 public class ResetPartitionTimeIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final Properties BROKER_CONFIG;
     static {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index b5aad95a0d..b7523a99e5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -75,6 +75,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -90,6 +91,8 @@ import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
 public class RestoreIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 725b38612a..2d667839fa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
@@ -71,7 +72,8 @@ import static org.hamcrest.Matchers.notNullValue;
 @RunWith(Parameterized.class)
 @SuppressWarnings("deprecation")
 public class RocksDBMetricsIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 3;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index 10683d0968..395ef6efd6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -63,6 +63,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
@@ -80,6 +82,8 @@ import static org.hamcrest.core.Is.is;
 @Category({IntegrationTest.class})
 @RunWith(Parameterized.class)
 public class SlidingWindowedKStreamIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 22d773595a..f76caf573b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -27,8 +27,10 @@ import org.apache.kafka.test.IntegrationTest;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -42,6 +44,8 @@ import static 
org.apache.kafka.streams.tests.SmokeTestDriver.verify;
 
 @Category(IntegrationTest.class)
 public class SmokeTestDriverIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
 
     @BeforeClass
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index 5b4e005086..d59a716004 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -41,6 +41,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -50,7 +51,8 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
 
 @Category({IntegrationTest.class})
 public class StandbyTaskCreationIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index 059211ffff..2ac2c2bdde 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -48,6 +48,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -74,7 +75,8 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 @Category(IntegrationTest.class)
 public class StandbyTaskEOSIntegrationTest {
-
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private final static long REBALANCE_TIMEOUT = 
Duration.ofMinutes(2L).toMillis();
     private final static int KEY_0 = 0;
     private final static int KEY_1 = 1;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
index 9c34fba6f6..38651cfd34 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -56,6 +57,8 @@ import static org.junit.Assert.assertTrue;
 
 @Category(IntegrationTest.class)
 public class StateDirectoryIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
index d890a30ae8..4cedf82bfc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
@@ -36,8 +36,10 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -47,6 +49,9 @@ import java.util.Properties;
 
 @Category({IntegrationTest.class})
 public class StateRestorationIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     private final StreamsBuilder builder = new StreamsBuilder();
 
     private static final String APPLICATION_ID = "restoration-test-app";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index f8cb79cf56..5f65cf2725 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +83,8 @@ import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
 public class StoreQueryIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StoreQueryIntegrationTest.class);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 9d2bd1e221..49818e71a4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -25,8 +25,10 @@ import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -43,6 +45,8 @@ import static java.time.Duration.ofSeconds;
 @Category({IntegrationTest.class})
 @RunWith(value = Parameterized.class)
 public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private KStream<Long, String> leftStream;
     private KStream<Long, String> rightStream;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 0f7e8aa95f..37d5fc2a72 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -23,8 +23,10 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -38,6 +40,8 @@ import java.util.List;
 @Category({IntegrationTest.class})
 @RunWith(value = Parameterized.class)
 public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private KStream<Long, String> leftStream;
     private KTable<Long, String> rightTable;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index 8d023b8028..38dc3a6ffc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -46,6 +46,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -67,6 +68,8 @@ import static org.junit.Assert.assertTrue;
 @RunWith(value = Parameterized.class)
 @Category({IntegrationTest.class})
 public class StreamTableJoinTopologyOptimizationIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 0f42d3546f..c81ddcfa74 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -76,6 +77,8 @@ import static org.junit.Assert.fail;
 @Category(IntegrationTest.class)
 @SuppressWarnings("deprecation") //Need to call the old handler, will remove 
those calls when the old handler is removed
 public class StreamsUncaughtExceptionHandlerIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java
index 4285530958..6a9453e992 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java
@@ -24,8 +24,10 @@ import org.apache.kafka.streams.tests.StreamsUpgradeTest;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -44,6 +46,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category(IntegrationTest.class)
 public class StreamsUpgradeTestIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 7ce428b794..1dc6e6a607 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -50,6 +50,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
@@ -85,6 +86,8 @@ import static org.hamcrest.Matchers.equalTo;
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
 public class SuppressionDurabilityIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
         3,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 71ef0e3690..35eee4764d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -47,8 +47,10 @@ import org.apache.kafka.test.TestUtils;
 import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -81,6 +83,8 @@ import static org.hamcrest.Matchers.empty;
 
 @Category(IntegrationTest.class)
 public class SuppressionIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
         1,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 579ed190a4..aaa0f462e8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -26,8 +26,10 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -41,6 +43,8 @@ import java.util.List;
 @Category({IntegrationTest.class})
 @RunWith(value = Parameterized.class)
 public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest 
{
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
     private KTable<Long, String> leftTable;
     private KTable<Long, String> rightTable;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 5ff6cb6ba7..a706d74d66 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -37,6 +37,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -56,6 +57,8 @@ import static org.hamcrest.Matchers.sameInstance;
 
 @Category(IntegrationTest.class)
 public class TaskAssignorIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index 2aec4edf70..791ee58ff7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -60,6 +61,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category(IntegrationTest.class)
 public class TaskMetadataIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 4f72a0a2f8..9abc2c9500 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -64,6 +64,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
@@ -83,6 +85,9 @@ import static org.junit.Assert.assertThrows;
 @Category({IntegrationTest.class})
 @RunWith(Parameterized.class)
 public class TimeWindowedKStreamIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
index 6e4cdef46a..33fcd8ede2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
@@ -31,7 +31,6 @@ import static org.junit.Assert.fail;
 
 
 public class ApiUtilsTest {
-
     // This is the maximum limit that Duration accepts but fails when it 
converts to milliseconds.
     private static final long MAX_ACCEPTABLE_DAYS_FOR_DURATION = 
106751991167300L;
     // This is the maximum limit that Duration accepts and converts to 
milliseconds with out fail.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index e0c45d0c1c..5336b0e788 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -29,8 +29,12 @@ import 
org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
 
 public class SmokeTestUtil {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     final static int END = Integer.MAX_VALUE;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
index a2a26a3fa8..5f847a683f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
@@ -18,7 +18,9 @@
 package org.apache.kafka.streams.tests;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -28,6 +30,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 
 public class SystemTestUtilTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     private final Map<String, String> expectedParsedMap = new TreeMap<>();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index d4f7841000..dc3cf65b2c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -29,7 +29,9 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -44,6 +46,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class StreamsResetterTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
 
     private static final String TOPIC = "topic1";
     private final StreamsResetter streamsResetter = new StreamsResetter();
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index cf22ecece4..b38b0c3a94 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -53,8 +53,8 @@ import scala.jdk.CollectionConverters._
  * Test suite that verifies that the topology built by the Java and Scala APIs 
match.
  */
 //noinspection ScalaDeprecation
+@Timeout(600)
 class TopologyTest {
-
   private val inputTopic = "input-topic"
   private val userClicksTopic = "user-clicks-topic"
   private val userRegionsTopic = "user-regions-topic"

Reply via email to