This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4c9eeef5b2 MINOR: add timeouts to streams integration tests (#12216)
4c9eeef5b2 is described below
commit 4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue May 31 14:22:13 2022 -0700
MINOR: add timeouts to streams integration tests (#12216)
Reviewers: David Arthur <[email protected]>
---
.../kafka/connect/integration/ErrorHandlingIntegrationTest.java | 6 ++++--
.../src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 3 +++
streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java | 4 ++++
.../src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java | 4 ++++
.../src/test/java/org/apache/kafka/streams/StreamsConfigTest.java | 5 ++++-
streams/src/test/java/org/apache/kafka/streams/TopologyTest.java | 4 ++++
.../kafka/streams/integration/AbstractResetIntegrationTest.java | 4 ++++
.../kafka/streams/integration/AdjustStreamThreadCountTest.java | 4 ++++
.../kafka/streams/integration/ConsistencyVectorIntegrationTest.java | 4 +++-
.../streams/integration/EOSUncleanShutdownIntegrationTest.java | 4 ++++
.../kafka/streams/integration/EmitOnChangeIntegrationTest.java | 3 +++
.../org/apache/kafka/streams/integration/EosIntegrationTest.java | 4 ++++
.../kafka/streams/integration/EosV2UpgradeIntegrationTest.java | 4 ++++
.../streams/integration/FineGrainedAutoResetIntegrationTest.java | 5 ++++-
.../kafka/streams/integration/GlobalKTableEOSIntegrationTest.java | 4 ++++
.../kafka/streams/integration/GlobalKTableIntegrationTest.java | 4 ++++
.../kafka/streams/integration/GlobalThreadShutDownOrderTest.java | 4 +++-
.../integration/HighAvailabilityTaskAssignorIntegrationTest.java | 4 ++++
.../org/apache/kafka/streams/integration/IQv2IntegrationTest.java | 4 +++-
.../apache/kafka/streams/integration/IQv2StoreIntegrationTest.java | 4 ++++
.../kafka/streams/integration/InternalTopicIntegrationTest.java | 5 +++++
.../apache/kafka/streams/integration/JoinStoreIntegrationTest.java | 3 +++
.../integration/JoinWithIncompleteMetadataIntegrationTest.java | 4 ++++
.../streams/integration/KStreamAggregationDedupIntegrationTest.java | 4 ++++
.../streams/integration/KStreamAggregationIntegrationTest.java | 4 ++++
.../streams/integration/KStreamRepartitionIntegrationTest.java | 4 ++++
.../kafka/streams/integration/KStreamTransformIntegrationTest.java | 5 ++++-
.../kafka/streams/integration/KTableEfficientRangeQueryTest.java | 3 +++
...leKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java | 4 ++++
.../KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java | 4 ++++
.../integration/KTableKTableForeignKeyJoinDistributedTest.java | 3 +++
.../integration/KTableKTableForeignKeyJoinIntegrationTest.java | 4 +++-
.../KTableKTableForeignKeyJoinMaterializationIntegrationTest.java | 4 +++-
.../integration/KTableSourceTopicRestartIntegrationTest.java | 3 +++
.../apache/kafka/streams/integration/LagFetchIntegrationTest.java | 4 +++-
.../apache/kafka/streams/integration/MetricsIntegrationTest.java | 4 +++-
.../kafka/streams/integration/MetricsReporterIntegrationTest.java | 4 +++-
.../kafka/streams/integration/NamedTopologyIntegrationTest.java | 4 ++++
.../kafka/streams/integration/OptimizedKTableIntegrationTest.java | 3 +++
.../kafka/streams/integration/PositionRestartIntegrationTest.java | 5 ++++-
.../streams/integration/PurgeRepartitionTopicIntegrationTest.java | 5 ++++-
.../kafka/streams/integration/QueryableStateIntegrationTest.java | 3 +++
.../kafka/streams/integration/RackAwarenessIntegrationTest.java | 3 +++
.../apache/kafka/streams/integration/RangeQueryIntegrationTest.java | 3 +++
.../kafka/streams/integration/RegexSourceIntegrationTest.java | 4 ++++
.../org/apache/kafka/streams/integration/ResetIntegrationTest.java | 5 ++++-
.../streams/integration/ResetPartitionTimeIntegrationTest.java | 3 +++
.../apache/kafka/streams/integration/RestoreIntegrationTest.java | 3 +++
.../kafka/streams/integration/RocksDBMetricsIntegrationTest.java | 4 +++-
.../streams/integration/SlidingWindowedKStreamIntegrationTest.java | 4 ++++
.../kafka/streams/integration/SmokeTestDriverIntegrationTest.java | 4 ++++
.../streams/integration/StandbyTaskCreationIntegrationTest.java | 4 +++-
.../kafka/streams/integration/StandbyTaskEOSIntegrationTest.java | 4 +++-
.../kafka/streams/integration/StateDirectoryIntegrationTest.java | 3 +++
.../kafka/streams/integration/StateRestorationIntegrationTest.java | 5 +++++
.../apache/kafka/streams/integration/StoreQueryIntegrationTest.java | 3 +++
.../kafka/streams/integration/StreamStreamJoinIntegrationTest.java | 4 ++++
.../kafka/streams/integration/StreamTableJoinIntegrationTest.java | 4 ++++
.../StreamTableJoinTopologyOptimizationIntegrationTest.java | 3 +++
.../integration/StreamsUncaughtExceptionHandlerIntegrationTest.java | 3 +++
.../streams/integration/StreamsUpgradeTestIntegrationTest.java | 4 ++++
.../streams/integration/SuppressionDurabilityIntegrationTest.java | 3 +++
.../kafka/streams/integration/SuppressionIntegrationTest.java | 4 ++++
.../kafka/streams/integration/TableTableJoinIntegrationTest.java | 4 ++++
.../kafka/streams/integration/TaskAssignorIntegrationTest.java | 3 +++
.../kafka/streams/integration/TaskMetadataIntegrationTest.java | 3 +++
.../streams/integration/TimeWindowedKStreamIntegrationTest.java | 5 +++++
.../test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java | 1 -
.../src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java | 4 ++++
.../java/org/apache/kafka/streams/tests/SystemTestUtilTest.java | 4 ++++
.../java/org/apache/kafka/streams/tools/StreamsResetterTest.java | 4 ++++
.../test/scala/org/apache/kafka/streams/scala/TopologyTest.scala | 2 +-
72 files changed, 254 insertions(+), 21 deletions(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index b3dd9a097e..5bc5fcdbd2 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -29,8 +29,10 @@ import
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,9 +67,9 @@ import static org.junit.Assert.fail;
*/
@Category(IntegrationTest.class)
public class ErrorHandlingIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger log =
LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
-
private static final int NUM_WORKERS = 1;
private static final String DLQ_TOPIC = "my-connector-errors";
private static final String CONNECTOR_NAME = "error-conn";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 3e37e097cd..95574a16ca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -74,6 +74,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
@@ -124,6 +125,8 @@ import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest({KafkaStreams.class, StreamThread.class, ClientMetrics.class,
StreamsConfigUtils.class})
public class KafkaStreamsTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_THREADS = 2;
private final static String APPLICATION_ID = "appId";
diff --git a/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
b/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
index 24f7d5d617..01e09746b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
@@ -16,12 +16,16 @@
*/
package org.apache.kafka.streams;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class KeyValueTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
@Test
public void shouldHaveSameEqualsAndHashCode() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 9b498df501..d7553c85b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -52,7 +52,9 @@ import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import java.time.Duration;
import java.time.Instant;
@@ -77,6 +79,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class StreamsBuilderTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final String STREAM_TOPIC = "stream-topic";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 98a6050a1a..aaf6858707 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -34,7 +34,9 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -78,7 +80,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class StreamsConfigTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private final Properties props = new Properties();
private StreamsConfig streamsConfig;
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 726d8b29a5..8d09cb3ee7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -51,7 +51,9 @@ import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import java.time.Duration;
import java.util.Arrays;
@@ -71,6 +73,8 @@ import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class TopologyTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private final StoreBuilder<MockKeyValueStore> storeBuilder =
EasyMock.createNiceMock(StoreBuilder.class);
private final KeyValueStoreBuilder<?, ?> globalStoreBuilder =
EasyMock.createNiceMock(KeyValueStoreBuilder.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 70bcef9cd6..83b843a6a5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.BufferedWriter;
import java.io.File;
@@ -69,6 +70,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Category({IntegrationTest.class})
public abstract class AbstractResetIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
static EmbeddedKafkaCluster cluster;
private static MockTime mockTime;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index f996238fe1..e70f0a9861 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -44,6 +44,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@@ -75,6 +77,8 @@ import static org.junit.Assert.fail;
@Category(IntegrationTest.class)
public class AdjustStreamThreadCountTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
index 0b4178b7ad..78629cc652 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -71,7 +72,8 @@ import static org.hamcrest.Matchers.is;
@Category({IntegrationTest.class})
public class ConsistencyVectorIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static int port = 0;
private static final String INPUT_TOPIC_NAME = "input-topic";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
index d3e991d51c..718f162a18 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
@@ -36,9 +36,11 @@ import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -66,6 +68,8 @@ import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
@Category(IntegrationTest.class)
public class EOSUncleanShutdownIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
@SuppressWarnings("deprecation")
@Parameterized.Parameters(name = "{0}")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index e0fcd4daa3..3d2bcb8055 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -53,6 +54,8 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
@Category(IntegrationTest.class)
public class EmitOnChangeIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 82765f73ea..7fd07b01ff 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -56,8 +56,10 @@ import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -99,6 +101,8 @@ import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class EosIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger LOG =
LoggerFactory.getLogger(EosIntegrationTest.class);
private static final int NUM_BROKERS = 3;
private static final int MAX_POLL_INTERVAL_MS = 5 * 1000;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
index 09f919ede6..aacb9e0483 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
@@ -56,8 +56,10 @@ import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -90,6 +92,8 @@ import static org.junit.Assert.assertFalse;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class EosV2UpgradeIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
@Parameterized.Parameters(name = "{0}")
public static Collection<Boolean[]> data() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index c94066c397..b7b0b5a714 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -44,8 +44,10 @@ import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.ArrayList;
@@ -64,7 +66,8 @@ import static org.junit.Assert.fail;
@Category({IntegrationTest.class})
public class FineGrainedAutoResetIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 2a1fedfd5e..b2272d0ec2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -55,6 +55,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -73,6 +74,9 @@ import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class GlobalKTableEOSIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
static {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 4cdd172721..64a83dff9d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -52,6 +52,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -69,6 +70,9 @@ import static org.junit.Assert.assertNotNull;
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 31658bbf3d..825ec13028 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -71,7 +72,8 @@ import static org.junit.Assert.assertEquals;
*/
@Category({IntegrationTest.class})
public class GlobalThreadShutDownOrderTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index 2b67a0f0fd..d712dfa3fc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Collection;
@@ -74,6 +75,9 @@ import static org.hamcrest.Matchers.is;
@Category(IntegrationTest.class)
public class HighAvailabilityTaskAssignorIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@BeforeClass
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 5ba218c65e..5d31398e38 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -63,6 +63,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -89,7 +90,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@Category({IntegrationTest.class})
public class IQv2IntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
private static int port = 0;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 1c828c7453..813626d9ec 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -71,8 +71,10 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
@@ -117,6 +119,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@Category({IntegrationTest.class})
@RunWith(value = Parameterized.class)
public class IQv2StoreIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger LOG =
LoggerFactory.getLogger(IQv2StoreIntegrationTest.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 3abe088216..6e5ec644a4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -48,8 +48,10 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -74,6 +76,9 @@ import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation")
@Category({IntegrationTest.class})
public class InternalTopicIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@BeforeClass
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index 04d3f7dd8b..cbe32e4a25 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -45,6 +45,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Properties;
@@ -60,6 +61,8 @@ import static org.junit.Assert.assertThrows;
@SuppressWarnings("deprecation")
@Category({IntegrationTest.class})
public class JoinStoreIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 1f6152da55..17cbf5f250 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -38,11 +38,15 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class JoinWithIncompleteMetadataIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@BeforeClass
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 79a697553b..99f03df22b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -51,6 +51,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -67,6 +68,9 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class KStreamAggregationDedupIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 3990c67a7b..9ec0806cc2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -71,6 +71,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -101,6 +102,9 @@ import static org.junit.Assert.assertTrue;
@SuppressWarnings({"unchecked", "deprecation"})
@Category({IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 4730f5a839..1cff0fc201 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -83,6 +84,9 @@ import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class KStreamRepartitionIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
index 51e0cbc0d6..d916faa9d7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java
@@ -39,8 +39,10 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
@@ -55,7 +57,8 @@ import static org.hamcrest.core.IsEqual.equalTo;
@SuppressWarnings("unchecked")
@Category({IntegrationTest.class})
public class KStreamTransformIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private StreamsBuilder builder;
private final String topic = "stream";
private final String stateStoreName = "myTransformState";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
index b0564ba16a..746de14f21 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
@@ -40,6 +40,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -61,6 +62,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(Parameterized.class)
public class KTableEfficientRangeQueryTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private enum StoreType { InMemory, RocksDB, Timed };
private static final String TABLE_NAME = "mytable";
private static final int DATA_SIZE = 5;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index e0bed5bf68..d5f411ef1f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -57,13 +57,17 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import kafka.utils.MockTime;
+import org.junit.rules.Timeout;
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index ed48cf8dbf..82f6ffae18 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -46,8 +46,10 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Collections;
@@ -64,6 +66,8 @@ import static org.junit.Assert.assertEquals;
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index af952bda9b..c95b37ae85 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -42,6 +42,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -58,6 +59,8 @@ import static org.junit.Assert.assertEquals;
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyJoinDistributedTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 60104c4755..176d27a2b3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -38,6 +38,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -60,7 +61,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(Parameterized.class)
public class KTableKTableForeignKeyJoinIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index 778f507cf2..5b63a655e2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -37,6 +37,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -58,7 +59,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(Parameterized.class)
public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 8d4ac2c6e1..b5778835f8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -43,6 +43,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -55,6 +56,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Category({IntegrationTest.class})
public class KTableSourceTopicRestartIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 3;
private static final String SOURCE_TOPIC = "source-topic";
private static final Properties PRODUCER_CONFIG = new Properties();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index 6a024966d9..3b7251d6da 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +77,8 @@ import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class LagFetchIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@BeforeClass
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index b2c2e4d5e7..0da0ac9eef 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -52,6 +52,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -68,7 +69,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class MetricsIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
index a7c925ad5f..e07a888690 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
@@ -35,6 +35,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.HashMap;
@@ -48,7 +49,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Category({IntegrationTest.class})
public class MetricsReporterIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 75bf3a1c3b..30de1d26cb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -71,6 +71,8 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
@@ -102,6 +104,8 @@ import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
public class NamedTopologyIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index f6f045dea9..2e709af0b1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -63,11 +63,14 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category(IntegrationTest.class)
public class OptimizedKTableIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger LOG =
LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static int port = 0;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
index db1f86eb35..1c40b1ea6f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
@@ -67,8 +67,10 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
@@ -100,7 +102,8 @@ import static org.hamcrest.Matchers.is;
@Category({IntegrationTest.class})
@RunWith(value = Parameterized.class)
public class PositionRestartIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger LOG =
LoggerFactory.getLogger(PositionRestartIntegrationTest.class);
private static final long SEED = new Random().nextLong();
private static final int NUM_BROKERS = 1;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 37d8743521..26720a0021 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -41,8 +41,10 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -55,7 +57,8 @@ import java.util.Set;
@Category({IntegrationTest.class})
public class PurgeRepartitionTopicIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String INPUT_TOPIC = "input-stream";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 52a896668e..82409af50f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -69,6 +69,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,6 +123,8 @@ import static org.junit.Assert.assertThrows;
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class QueryableStateIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger log =
LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
private static final long DEFAULT_TIMEOUT_MS = 120 * 1000;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
index f4afbe8012..54736c44a2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -61,6 +62,8 @@ import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class RackAwarenessIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
index 1c22bfca37..c67696b2ca 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -68,6 +69,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class RangeQueryIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
private static final Properties STREAMS_CONFIG = new Properties();
private static final String APP_ID = "range-query-integration-test";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 99cb358b6e..1a22c90fcf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -51,9 +51,11 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -81,6 +83,8 @@ import static org.hamcrest.Matchers.greaterThan;
*/
@Category({IntegrationTest.class})
public class RegexSourceIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 5c236e6ba8..204a66d148 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -32,8 +32,10 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.BufferedWriter;
import java.io.File;
@@ -55,7 +57,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
*/
@Category({IntegrationTest.class})
public class ResetIntegrationTest extends AbstractResetIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
public static final EmbeddedKafkaCluster CLUSTER;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
index c3e7c283d0..7fe905ae7d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
@@ -41,6 +41,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -65,6 +66,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class ResetPartitionTimeIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
static {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index b5aad95a0d..b7523a99e5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -75,6 +75,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -90,6 +91,8 @@ import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class RestoreIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 725b38612a..2d667839fa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -71,7 +72,8 @@ import static org.hamcrest.Matchers.notNullValue;
@RunWith(Parameterized.class)
@SuppressWarnings("deprecation")
public class RocksDBMetricsIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 3;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index 10683d0968..395ef6efd6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -63,6 +63,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -80,6 +82,8 @@ import static org.hamcrest.core.Is.is;
@Category({IntegrationTest.class})
@RunWith(Parameterized.class)
public class SlidingWindowedKStreamIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 22d773595a..f76caf573b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -27,8 +27,10 @@ import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -42,6 +44,8 @@ import static
org.apache.kafka.streams.tests.SmokeTestDriver.verify;
@Category(IntegrationTest.class)
public class SmokeTestDriverIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
@BeforeClass
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index 5b4e005086..d59a716004 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -41,6 +41,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Properties;
@@ -50,7 +51,8 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
@Category({IntegrationTest.class})
public class StandbyTaskCreationIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index 059211ffff..2ac2c2bdde 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -48,6 +48,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -74,7 +75,8 @@ import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
@Category(IntegrationTest.class)
public class StandbyTaskEOSIntegrationTest {
-
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private final static long REBALANCE_TIMEOUT =
Duration.ofMinutes(2L).toMillis();
private final static int KEY_0 = 0;
private final static int KEY_1 = 1;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
index 9c34fba6f6..38651cfd34 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -56,6 +57,8 @@ import static org.junit.Assert.assertTrue;
@Category(IntegrationTest.class)
public class StateDirectoryIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
index d890a30ae8..4cedf82bfc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java
@@ -36,8 +36,10 @@ import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -47,6 +49,9 @@ import java.util.Properties;
@Category({IntegrationTest.class})
public class StateRestorationIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
private final StreamsBuilder builder = new StreamsBuilder();
private static final String APPLICATION_ID = "restoration-test-app";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index f8cb79cf56..5f65cf2725 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +83,8 @@ import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class StoreQueryIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger LOG =
LoggerFactory.getLogger(StoreQueryIntegrationTest.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 9d2bd1e221..49818e71a4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -25,8 +25,10 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -43,6 +45,8 @@ import static java.time.Duration.ofSeconds;
@Category({IntegrationTest.class})
@RunWith(value = Parameterized.class)
public class StreamStreamJoinIntegrationTest extends
AbstractJoinIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private KStream<Long, String> leftStream;
private KStream<Long, String> rightStream;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 0f7e8aa95f..37d5fc2a72 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -23,8 +23,10 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -38,6 +40,8 @@ import java.util.List;
@Category({IntegrationTest.class})
@RunWith(value = Parameterized.class)
public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private KStream<Long, String> leftStream;
private KTable<Long, String> rightTable;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index 8d023b8028..38dc3a6ffc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -46,6 +46,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -67,6 +68,8 @@ import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
@Category({IntegrationTest.class})
public class StreamTableJoinTopologyOptimizationIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 0f42d3546f..c81ddcfa74 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -76,6 +77,8 @@ import static org.junit.Assert.fail;
@Category(IntegrationTest.class)
@SuppressWarnings("deprecation") //Need to call the old handler, will remove
those calls when the old handler is removed
public class StreamsUncaughtExceptionHandlerIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java
index 4285530958..6a9453e992 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java
@@ -24,8 +24,10 @@ import org.apache.kafka.streams.tests.StreamsUpgradeTest;
import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -44,6 +46,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Category(IntegrationTest.class)
public class StreamsUpgradeTestIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 7ce428b794..1dc6e6a607 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -50,6 +50,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
@@ -85,6 +86,8 @@ import static org.hamcrest.Matchers.equalTo;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class SuppressionDurabilityIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(
3,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 71ef0e3690..35eee4764d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -47,8 +47,10 @@ import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.Collections;
@@ -81,6 +83,8 @@ import static org.hamcrest.Matchers.empty;
@Category(IntegrationTest.class)
public class SuppressionIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(
1,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 579ed190a4..aaa0f462e8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -26,8 +26,10 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -41,6 +43,8 @@ import java.util.List;
@Category({IntegrationTest.class})
@RunWith(value = Parameterized.class)
public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest
{
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private KTable<Long, String> leftTable;
private KTable<Long, String> rightTable;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 5ff6cb6ba7..a706d74d66 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -37,6 +37,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -56,6 +57,8 @@ import static org.hamcrest.Matchers.sameInstance;
@Category(IntegrationTest.class)
public class TaskAssignorIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index 2aec4edf70..791ee58ff7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -60,6 +61,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Category(IntegrationTest.class)
public class TaskMetadataIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 4f72a0a2f8..9abc2c9500 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -64,6 +64,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -83,6 +85,9 @@ import static org.junit.Assert.assertThrows;
@Category({IntegrationTest.class})
@RunWith(Parameterized.class)
public class TimeWindowedKStreamIntegrationTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
index 6e4cdef46a..33fcd8ede2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
@@ -31,7 +31,6 @@ import static org.junit.Assert.fail;
public class ApiUtilsTest {
-
// This is the maximum limit that Duration accepts but fails when it
converts to milliseconds.
private static final long MAX_ACCEPTABLE_DAYS_FOR_DURATION =
106751991167300L;
// This is the maximum limit that Duration accepts and converts to
milliseconds with out fail.
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index e0c45d0c1c..5336b0e788 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -29,8 +29,12 @@ import
org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
public class SmokeTestUtil {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
final static int END = Integer.MAX_VALUE;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
index a2a26a3fa8..5f847a683f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/tests/SystemTestUtilTest.java
@@ -18,7 +18,9 @@
package org.apache.kafka.streams.tests;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import java.util.HashMap;
import java.util.Map;
@@ -28,6 +30,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
public class SystemTestUtilTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private final Map<String, String> expectedParsedMap = new TreeMap<>();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index d4f7841000..dc3cf65b2c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -29,7 +29,9 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import java.time.Duration;
import java.time.Instant;
@@ -44,6 +46,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StreamsResetterTest {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
private static final String TOPIC = "topic1";
private final StreamsResetter streamsResetter = new StreamsResetter();
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index cf22ecece4..b38b0c3a94 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -53,8 +53,8 @@ import scala.jdk.CollectionConverters._
* Test suite that verifies that the topology built by the Java and Scala APIs
match.
*/
//noinspection ScalaDeprecation
+@Timeout(600)
class TopologyTest {
-
private val inputTopic = "input-topic"
private val userClicksTopic = "user-clicks-topic"
private val userRegionsTopic = "user-regions-topic"