This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 04b4a8f5717 KAFKA-19705: Enable streams rebalance protocol in IQv2 
integration test (#20541)
04b4a8f5717 is described below

commit 04b4a8f5717c7433e1178b22846ee2415b50020e
Author: Jinhe Zhang <[email protected]>
AuthorDate: Thu Sep 18 03:41:52 2025 -0400

    KAFKA-19705: Enable streams rebalance protocol in IQv2 integration test 
(#20541)
    
    Update IQv2 Integration tests for streams group protocol
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../streams/integration/IQv2IntegrationTest.java   | 75 +++++++++++++++-------
 .../integration/IQv2StoreIntegrationTest.java      | 18 ++++--
 .../IQv2VersionedStoreIntegrationTest.java         | 67 +++++++++++++------
 3 files changed, 112 insertions(+), 48 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index dcd711a35c5..20fc7f47236 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -57,11 +57,12 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -76,6 +77,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
 
 import static java.util.Collections.singleton;
 import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
@@ -99,6 +101,7 @@ public class IQv2IntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
     private KafkaStreams kafkaStreams;
+    private String groupProtocol;
 
     @BeforeAll
     public static void before()
@@ -149,8 +152,8 @@ public class IQv2IntegrationTest {
         ));
     }
 
-    @BeforeEach
-    public void beforeTest(final TestInfo testInfo) {
+    private void setup(final String groupProtocol, final TestInfo testInfo) {
+        this.groupProtocol = groupProtocol;
         final StreamsBuilder builder = new StreamsBuilder();
 
         builder.table(
@@ -159,7 +162,6 @@ public class IQv2IntegrationTest {
             Materialized.as(STORE_NAME)
         );
 
-
         final String safeTestName = safeUniqueTestName(testInfo);
         kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration(safeTestName));
         kafkaStreams.cleanUp();
@@ -167,8 +169,10 @@ public class IQv2IntegrationTest {
 
     @AfterEach
     public void afterTest() {
-        kafkaStreams.close(Duration.ofSeconds(60));
-        kafkaStreams.cleanUp();
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(60));
+            kafkaStreams.cleanUp();
+        }
     }
 
     @AfterAll
@@ -176,8 +180,10 @@ public class IQv2IntegrationTest {
         CLUSTER.stop();
     }
 
-    @Test
-    public void shouldFailUnknownStore() {
+    @ParameterizedTest(name = "{1}")
+    @MethodSource("groupProtocolParameters")
+    public void shouldFailUnknownStore(final String groupProtocol, final 
String testName, final TestInfo testInfo) {
+        setup(groupProtocol, testInfo);
         final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);
         final StateQueryRequest<ValueAndTimestamp<Integer>> request =
             inStore("unknown-store").withQuery(query);
@@ -185,8 +191,10 @@ public class IQv2IntegrationTest {
         assertThrows(UnknownStateStoreException.class, () -> 
kafkaStreams.query(request));
     }
 
-    @Test
-    public void shouldFailNotStarted() {
+    @ParameterizedTest(name = "{1}")
+    @MethodSource("groupProtocolParameters")
+    public void shouldFailNotStarted(final String groupProtocol, final String 
testName, final TestInfo testInfo) {
+        setup(groupProtocol, testInfo);
         final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);
         final StateQueryRequest<ValueAndTimestamp<Integer>> request =
             inStore(STORE_NAME).withQuery(query);
@@ -194,8 +202,10 @@ public class IQv2IntegrationTest {
         assertThrows(StreamsNotStartedException.class, () -> 
kafkaStreams.query(request));
     }
 
-    @Test
-    public void shouldFailStopped() {
+    @ParameterizedTest(name = "{1}")
+    @MethodSource("groupProtocolParameters")
+    public void shouldFailStopped(final String groupProtocol, final String 
testName, final TestInfo testInfo) {
+        setup(groupProtocol, testInfo);
         final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);
         final StateQueryRequest<ValueAndTimestamp<Integer>> request =
             inStore(STORE_NAME).withQuery(query);
@@ -205,9 +215,11 @@ public class IQv2IntegrationTest {
         assertThrows(StreamsStoppedException.class, () -> 
kafkaStreams.query(request));
     }
 
-    @Test
-    public void shouldRejectNonRunningActive()
+    @ParameterizedTest(name = "{1}")
+    @MethodSource("groupProtocolParameters")
+    public void shouldRejectNonRunningActive(final String groupProtocol, final 
String testName, final TestInfo testInfo)
         throws NoSuchFieldException, IllegalAccessException {
+        setup(groupProtocol, testInfo);
         final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);
         final StateQueryRequest<ValueAndTimestamp<Integer>> request =
             inStore(STORE_NAME).withQuery(query).requireActive();
@@ -261,8 +273,10 @@ public class IQv2IntegrationTest {
         }
     }
 
-    @Test
-    public void shouldFetchFromPartition() {
+    @ParameterizedTest(name = "{1}")
+    @MethodSource("groupProtocolParameters")
+    public void shouldFetchFromPartition(final String groupProtocol, final 
String testName, final TestInfo testInfo) {
+        setup(groupProtocol, testInfo);
         final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);
         final int partition = 1;
         final Set<Integer> partitions = singleton(partition);
@@ -276,8 +290,10 @@ public class IQv2IntegrationTest {
         assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
     }
 
-    @Test
-    public void shouldFetchExplicitlyFromAllPartitions() {
+    @ParameterizedTest(name = "{1}")
+    @MethodSource("groupProtocolParameters")
+    public void shouldFetchExplicitlyFromAllPartitions(final String 
groupProtocol, final String testName, final TestInfo testInfo) {
+        setup(groupProtocol, testInfo);
         final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);
         final Set<Integer> partitions = Set.of(0, 1);
         final StateQueryRequest<ValueAndTimestamp<Integer>> request =
@@ -290,8 +306,10 @@ public class IQv2IntegrationTest {
         assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
     }
 
-    @Test
-    public void shouldNotRequireQueryHandler(final TestInfo testInfo) {
+    @ParameterizedTest(name = "{1}")
+    @MethodSource("groupProtocolParameters")
+    public void shouldNotRequireQueryHandler(final String groupProtocol, final 
String testName, final TestInfo testInfo) {
+        setup(groupProtocol, testInfo);
         final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);
         final int partition = 1;
         final Set<Integer> partitions = singleton(partition);
@@ -423,8 +441,11 @@ public class IQv2IntegrationTest {
         );
 
         // Discard the basic streams and replace with test-specific topology
-        kafkaStreams.close();
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
         final String safeTestName = safeUniqueTestName(testInfo);
+        this.groupProtocol = groupProtocol;
         kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration(safeTestName));
         kafkaStreams.cleanUp();
 
@@ -446,7 +467,7 @@ public class IQv2IntegrationTest {
     private Properties streamsConfiguration(final String safeTestName) {
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName 
+ "-" + groupProtocol);
         config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 
(++port));
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
@@ -458,6 +479,14 @@ public class IQv2IntegrationTest {
         config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
         return config;
     }
+
+    private static Stream<Arguments> groupProtocolParameters() {
+        return Stream.of(
+            Arguments.of("classic", "CLASSIC protocol"),
+            Arguments.of("streams", "STREAMS protocol")
+        );
+    }
 }
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 963183fd665..649799c7976 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -360,7 +360,9 @@ public class IQv2StoreIntegrationTest {
             for (final boolean logEnabled : Arrays.asList(true, false)) {
                 for (final StoresToTest toTest : StoresToTest.values()) {
                     for (final String kind : Arrays.asList("DSL", "PAPI")) {
-                        values.add(Arguments.of(cacheEnabled, logEnabled, 
toTest.name(), kind));
+                        for (final String groupProtocol : 
Arrays.asList("classic", "streams")) {
+                            values.add(Arguments.of(cacheEnabled, logEnabled, 
toTest.name(), kind, groupProtocol));
+                        }
                     }
                 }
             }
@@ -426,13 +428,14 @@ public class IQv2StoreIntegrationTest {
         ));
     }
 
-    public void setup(final boolean cache, final boolean log, final 
StoresToTest storeToTest, final String kind) {
+    public void setup(final boolean cache, final boolean log, final 
StoresToTest storeToTest, final String kind, final String groupProtocol) {
         final StoreSupplier<?> supplier = storeToTest.supplier();
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
             storeToTest.name(),
-            kind
+            kind,
+            groupProtocol
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -765,8 +768,8 @@ public class IQv2StoreIntegrationTest {
 
     @ParameterizedTest
     @MethodSource("data")
-    public void verifyStore(final boolean cache, final boolean log, final 
StoresToTest storeToTest, final String kind) {
-        setup(cache, log, storeToTest, kind);
+    public void verifyStore(final boolean cache, final boolean log, final 
StoresToTest storeToTest, final String kind, final String groupProtocol) {
+        setup(cache, log, storeToTest, kind, groupProtocol);
         try {
             if (storeToTest.global()) {
                 // See KAFKA-13523
@@ -2030,10 +2033,10 @@ public class IQv2StoreIntegrationTest {
     }
 
     private static Properties streamsConfiguration(final boolean cache, final 
boolean log,
-                                                   final String supplier, 
final String kind) {
+                                                   final String supplier, 
final String kind, final String groupProtocol) {
         final String safeTestName =
             IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log 
+ "-" + supplier
-                + "-" + kind + "-" + RANDOM.nextInt();
+                + "-" + kind + "-" + groupProtocol + "-" + RANDOM.nextInt();
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
@@ -2048,6 +2051,7 @@ public class IQv2StoreIntegrationTest {
         config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
         return config;
     }
 }
\ No newline at end of file
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
index 26a5aed5a52..bb1f3c20a87 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
@@ -47,7 +47,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -57,7 +60,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.stream.Stream;
 
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -83,16 +88,25 @@ public class IQv2VersionedStoreIntegrationTest {
     private static final Long[] RECORD_TIMESTAMPS = {BASE_TIMESTAMP_LONG, 
BASE_TIMESTAMP_LONG + 10, BASE_TIMESTAMP_LONG + 20, BASE_TIMESTAMP_LONG + 30};
     private static final int RECORD_NUMBER = RECORD_VALUES.length;
     private static final int LAST_INDEX = RECORD_NUMBER - 1;
-    private static final Position INPUT_POSITION = Position.emptyPosition();
+    private Position inputPosition;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS, 
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"true")));
 
     private KafkaStreams kafkaStreams;
+    private String groupProtocol;
 
     @BeforeAll
-    public static void before() throws Exception {
+    public static void beforeAll() throws Exception {
         CLUSTER.start();
-
+    }
+    
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        // Delete and recreate the topic to ensure clean state for each test
+        CLUSTER.deleteTopic(INPUT_TOPIC_NAME);
+        CLUSTER.createTopic(INPUT_TOPIC_NAME, 1, 1);
+        
+        // Set up fresh test data
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
@@ -103,19 +117,21 @@ public class IQv2VersionedStoreIntegrationTest {
             producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,  
RECORD_TIMESTAMPS[2], RECORD_KEY, RECORD_VALUES[2])).get();
             producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,  
RECORD_TIMESTAMPS[3], RECORD_KEY, RECORD_VALUES[3])).get();
         }
-        INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3);
+        inputPosition = 
Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 3);
     }
 
-    @BeforeEach
-    public void beforeTest() {
+    private void setup(final String groupProtocol, final TestInfo testInfo) {
+        this.groupProtocol = groupProtocol;
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME,
             
Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, 
HISTORY_RETENTION, SEGMENT_INTERVAL)));
         final Properties configs = new Properties();
-        configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
+        final String safeTestName = safeUniqueTestName(testInfo);
+        configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class.getName());
         configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class.getName());
+        configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
         kafkaStreams = IntegrationTestUtils.getStartedStreams(configs, 
builder, true);
     }
 
@@ -132,8 +148,19 @@ public class IQv2VersionedStoreIntegrationTest {
         CLUSTER.stop();
     }
 
-    @Test
-    public void verifyStore() {
+    private static Stream<Arguments> groupProtocolParameters() {
+        return Stream.of(
+            Arguments.of("classic", "CLASSIC protocol"),
+            Arguments.of("streams", "STREAMS protocol")
+        );
+    }
+
+    @ParameterizedTest(name = "{1}")
+    @MethodSource("groupProtocolParameters")
+    public void verifyStore(final String groupProtocol, final String testName, 
final TestInfo testInfo) throws Exception {
+        // Set up streams
+        setup(groupProtocol, testInfo);
+        
         /* Test Versioned Key Queries */
         // retrieve the latest value
         shouldHandleVersionedKeyQuery(Optional.empty(), RECORD_VALUES[3], 
RECORD_TIMESTAMPS[3], Optional.empty());
@@ -255,7 +282,10 @@ public class IQv2VersionedStoreIntegrationTest {
     private void shouldHandleRaceCondition() {
         final MultiVersionedKeyQuery<Integer, Integer> query = 
defineQuery(RECORD_KEY, Optional.empty(), Optional.empty(), ResultOrder.ANY);
 
-        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = sendRequestAndReceiveResults(query, kafkaStreams);
+        // For race condition test, we don't use position bounds since we're 
testing concurrent updates
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query);
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
 
         // verify results in two steps
         for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
@@ -327,14 +357,14 @@ public class IQv2VersionedStoreIntegrationTest {
         return query;
     }
 
-    private static Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
sendRequestAndReceiveResults(final MultiVersionedKeyQuery<Integer, Integer> 
query, final KafkaStreams kafkaStreams) {
-        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+    private Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
sendRequestAndReceiveResults(final MultiVersionedKeyQuery<Integer, Integer> 
query, final KafkaStreams kafkaStreams) {
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(inputPosition));
         final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         return result.getPartitionResults();
     }
 
-    private static QueryResult<VersionedRecord<Integer>> 
sendRequestAndReceiveResults(final VersionedKeyQuery<Integer, Integer> query, 
final KafkaStreams kafkaStreams) {
-        final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+    private QueryResult<VersionedRecord<Integer>> 
sendRequestAndReceiveResults(final VersionedKeyQuery<Integer, Integer> query, 
final KafkaStreams kafkaStreams) {
+        final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(inputPosition));
         final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         return result.getOnlyPartitionResult();
     }
@@ -352,7 +382,7 @@ public class IQv2VersionedStoreIntegrationTest {
     /**
      * This method inserts a new value (999999) for the key in the oldest 
timestamp (RECORD_TIMESTAMPS[0]).
      */
-    private static void updateRecordValue() {
+    private void updateRecordValue() {
         // update the record value at RECORD_TIMESTAMPS[0]
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
@@ -361,8 +391,9 @@ public class IQv2VersionedStoreIntegrationTest {
         try (final KafkaProducer<Integer, Integer> producer = new 
KafkaProducer<>(producerProps)) {
             producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMPS[0], RECORD_KEY, 999999));
         }
-        INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4);
-        assertThat(INPUT_POSITION, 
equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4)));
+
+        inputPosition = inputPosition.withComponent(INPUT_TOPIC_NAME, 0, 4);
+        assertThat(inputPosition, 
equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4)));
 
         // make sure that the new value is picked up by the store
         final Properties consumerProps = new Properties();

Reply via email to