This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7908a4838b4 Fix long running RangeQueryIntegrationTest. (#17933)
7908a4838b4 is described below

commit 7908a4838b4705d3c1ebeaf079dde686734b0b8f
Author: Kaushik Raina <[email protected]>
AuthorDate: Mon Nov 25 22:12:02 2024 +0530

    Fix long running RangeQueryIntegrationTest. (#17933)
    
    Noticed that RangeQueryIntegrationTest is taking ~approx 20 - 30min to run
    Upon deep dive in logs, noticed that there were error for consumer 
rebalancing and test was stuck in loop
    Seems like due to same application.id across tests, Kafka Streams 
application is failing to track its state correctly across rebalances.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../kafka/streams/integration/RangeQueryIntegrationTest.java   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
index 6842c7718e1..5f45f3d7212 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -59,6 +60,7 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -68,7 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 public class RangeQueryIntegrationTest {
     private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
     private static final Properties STREAMS_CONFIG = new Properties();
-    private static final String APP_ID = "range-query-integration-test";
     private static final Long COMMIT_INTERVAL = 100L;
     private static String inputStream;
     private static final String TABLE_NAME = "mytable";
@@ -136,7 +137,6 @@ public class RangeQueryIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
     }
 
@@ -158,16 +158,16 @@ public class RangeQueryIntegrationTest {
 
     @ParameterizedTest
     @MethodSource("data")
-    public void testStoreConfig(final StoreType storeType, final boolean 
enableLogging, final boolean enableCaching, final boolean forward) throws 
Exception {
+    public void testStoreConfig(final StoreType storeType, final boolean 
enableLogging, final boolean enableCaching, final boolean forward, final 
TestInfo testInfo) throws Exception {
+        final String appID = safeUniqueTestName(testInfo);
         final StreamsBuilder builder = new StreamsBuilder();
         final Materialized<String, String, KeyValueStore<Bytes, byte[]>> 
stateStoreConfig = getStoreConfig(storeType, enableLogging, enableCaching);
         builder.table(inputStream, stateStoreConfig);
-
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), STREAMS_CONFIG)) {
             
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
             writeInputData();
-
             final ReadOnlyKeyValueStore<String, String> stateStore = 
IntegrationTestUtils.getStore(1000_000L, TABLE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
 
             // wait for the store to populate

Reply via email to