This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7908a4838b4 Fix long running RangeQueryIntegrationTest. (#17933)
7908a4838b4 is described below
commit 7908a4838b4705d3c1ebeaf079dde686734b0b8f
Author: Kaushik Raina <[email protected]>
AuthorDate: Mon Nov 25 22:12:02 2024 +0530
Fix long running RangeQueryIntegrationTest. (#17933)
Noticed that RangeQueryIntegrationTest is taking ~approx 20 - 30min to run
Upon deep dive in logs, noticed that there were error for consumer
rebalancing and test was stuck in loop
Seems like due to same application.id across tests, Kafka Streams
application is failing to track its state correctly across rebalances.
Reviewers: Bill Bejeck <[email protected]>
---
.../kafka/streams/integration/RangeQueryIntegrationTest.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
index 6842c7718e1..5f45f3d7212 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -59,6 +60,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -68,7 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
public class RangeQueryIntegrationTest {
private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
private static final Properties STREAMS_CONFIG = new Properties();
- private static final String APP_ID = "range-query-integration-test";
private static final Long COMMIT_INTERVAL = 100L;
private static String inputStream;
private static final String TABLE_NAME = "mytable";
@@ -136,7 +137,6 @@ public class RangeQueryIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL);
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
}
@@ -158,16 +158,16 @@ public class RangeQueryIntegrationTest {
@ParameterizedTest
@MethodSource("data")
- public void testStoreConfig(final StoreType storeType, final boolean
enableLogging, final boolean enableCaching, final boolean forward) throws
Exception {
+ public void testStoreConfig(final StoreType storeType, final boolean
enableLogging, final boolean enableCaching, final boolean forward, final
TestInfo testInfo) throws Exception {
+ final String appID = safeUniqueTestName(testInfo);
final StreamsBuilder builder = new StreamsBuilder();
final Materialized<String, String, KeyValueStore<Bytes, byte[]>>
stateStoreConfig = getStoreConfig(storeType, enableLogging, enableCaching);
builder.table(inputStream, stateStoreConfig);
-
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
writeInputData();
-
final ReadOnlyKeyValueStore<String, String> stateStore =
IntegrationTestUtils.getStore(1000_000L, TABLE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
// wait for the store to populate