This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b3eed510235 KAFKA-19660: JoinWithIncompleteMetadataIntegrationTest
fails in isolated run of one parameter (#20483)
b3eed510235 is described below
commit b3eed510235d49a0cc8eb322cdfb8486dfc16748
Author: Jinhe Zhang <[email protected]>
AuthorDate: Mon Sep 8 11:02:11 2025 -0400
KAFKA-19660: JoinWithIncompleteMetadataIntegrationTest fails in isolated
run of one parameter (#20483)
The original test timeout when using new protocol, because it use
`ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG` as the exception's timeout,
which is 300s. Also the test for new protocol and old protocol use the
same group ID, so the failure will be hidden.
What I do:
1. Set the timeout as 5 secs so it can be captured within 10s
2. Use new appId for new protocol
Reviewers: Lucas Brutschy <[email protected]>
---
.../integration/JoinWithIncompleteMetadataIntegrationTest.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 04f35dcfce4..291401c82fd 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -57,6 +57,7 @@ public class JoinWithIncompleteMetadataIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL);
+
STREAMS_CONFIG.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
MISSING_TOPIC_DETECTION_TIMEOUT_MS);
}
@AfterAll
@@ -66,6 +67,7 @@ public class JoinWithIncompleteMetadataIntegrationTest {
private static final String APP_ID =
"join-incomplete-metadata-integration-test";
private static final Long COMMIT_INTERVAL = 100L;
+ private static final int MISSING_TOPIC_DETECTION_TIMEOUT_MS = 5000;
static final Properties STREAMS_CONFIG = new Properties();
static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
static final String NON_EXISTENT_INPUT_TOPIC_LEFT =
"inputTopicLeft-not-exist";
@@ -93,7 +95,8 @@ public class JoinWithIncompleteMetadataIntegrationTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testShouldAutoShutdownOnJoinWithIncompleteMetadata(final
boolean useNewProtocol) throws InterruptedException {
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+ final String appId = APP_ID + "-" + (useNewProtocol ? "new" : "old");
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
if (useNewProtocol) {