This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 04b4a8f5717 KAFKA-19705: Enable streams rebalance protocol in IQv2
integration test (#20541)
04b4a8f5717 is described below
commit 04b4a8f5717c7433e1178b22846ee2415b50020e
Author: Jinhe Zhang <[email protected]>
AuthorDate: Thu Sep 18 03:41:52 2025 -0400
KAFKA-19705: Enable streams rebalance protocol in IQv2 integration test
(#20541)
Update IQv2 Integration tests for streams group protocol
Reviewers: Lucas Brutschy <[email protected]>
---
.../streams/integration/IQv2IntegrationTest.java | 75 +++++++++++++++-------
.../integration/IQv2StoreIntegrationTest.java | 18 ++++--
.../IQv2VersionedStoreIntegrationTest.java | 67 +++++++++++++------
3 files changed, 112 insertions(+), 48 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index dcd711a35c5..20fc7f47236 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -57,11 +57,12 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -76,6 +77,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
import static java.util.Collections.singleton;
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
@@ -99,6 +101,7 @@ public class IQv2IntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
private KafkaStreams kafkaStreams;
+ private String groupProtocol;
@BeforeAll
public static void before()
@@ -149,8 +152,8 @@ public class IQv2IntegrationTest {
));
}
- @BeforeEach
- public void beforeTest(final TestInfo testInfo) {
+ private void setup(final String groupProtocol, final TestInfo testInfo) {
+ this.groupProtocol = groupProtocol;
final StreamsBuilder builder = new StreamsBuilder();
builder.table(
@@ -159,7 +162,6 @@ public class IQv2IntegrationTest {
Materialized.as(STORE_NAME)
);
-
final String safeTestName = safeUniqueTestName(testInfo);
kafkaStreams = new KafkaStreams(builder.build(),
streamsConfiguration(safeTestName));
kafkaStreams.cleanUp();
@@ -167,8 +169,10 @@ public class IQv2IntegrationTest {
@AfterEach
public void afterTest() {
- kafkaStreams.close(Duration.ofSeconds(60));
- kafkaStreams.cleanUp();
+ if (kafkaStreams != null) {
+ kafkaStreams.close(Duration.ofSeconds(60));
+ kafkaStreams.cleanUp();
+ }
}
@AfterAll
@@ -176,8 +180,10 @@ public class IQv2IntegrationTest {
CLUSTER.stop();
}
- @Test
- public void shouldFailUnknownStore() {
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("groupProtocolParameters")
+ public void shouldFailUnknownStore(final String groupProtocol, final
String testName, final TestInfo testInfo) {
+ setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query =
KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore("unknown-store").withQuery(query);
@@ -185,8 +191,10 @@ public class IQv2IntegrationTest {
assertThrows(UnknownStateStoreException.class, () ->
kafkaStreams.query(request));
}
- @Test
- public void shouldFailNotStarted() {
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("groupProtocolParameters")
+ public void shouldFailNotStarted(final String groupProtocol, final String
testName, final TestInfo testInfo) {
+ setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query =
KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query);
@@ -194,8 +202,10 @@ public class IQv2IntegrationTest {
assertThrows(StreamsNotStartedException.class, () ->
kafkaStreams.query(request));
}
- @Test
- public void shouldFailStopped() {
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("groupProtocolParameters")
+ public void shouldFailStopped(final String groupProtocol, final String
testName, final TestInfo testInfo) {
+ setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query =
KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query);
@@ -205,9 +215,11 @@ public class IQv2IntegrationTest {
assertThrows(StreamsStoppedException.class, () ->
kafkaStreams.query(request));
}
- @Test
- public void shouldRejectNonRunningActive()
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("groupProtocolParameters")
+ public void shouldRejectNonRunningActive(final String groupProtocol, final
String testName, final TestInfo testInfo)
throws NoSuchFieldException, IllegalAccessException {
+ setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query =
KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query).requireActive();
@@ -261,8 +273,10 @@ public class IQv2IntegrationTest {
}
}
- @Test
- public void shouldFetchFromPartition() {
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("groupProtocolParameters")
+ public void shouldFetchFromPartition(final String groupProtocol, final
String testName, final TestInfo testInfo) {
+ setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query =
KeyQuery.withKey(1);
final int partition = 1;
final Set<Integer> partitions = singleton(partition);
@@ -276,8 +290,10 @@ public class IQv2IntegrationTest {
assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
}
- @Test
- public void shouldFetchExplicitlyFromAllPartitions() {
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("groupProtocolParameters")
+ public void shouldFetchExplicitlyFromAllPartitions(final String
groupProtocol, final String testName, final TestInfo testInfo) {
+ setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query =
KeyQuery.withKey(1);
final Set<Integer> partitions = Set.of(0, 1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
@@ -290,8 +306,10 @@ public class IQv2IntegrationTest {
assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
}
- @Test
- public void shouldNotRequireQueryHandler(final TestInfo testInfo) {
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("groupProtocolParameters")
+ public void shouldNotRequireQueryHandler(final String groupProtocol, final
String testName, final TestInfo testInfo) {
+ setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query =
KeyQuery.withKey(1);
final int partition = 1;
final Set<Integer> partitions = singleton(partition);
@@ -423,8 +441,11 @@ public class IQv2IntegrationTest {
);
// Discard the basic streams and replace with test-specific topology
- kafkaStreams.close();
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ }
final String safeTestName = safeUniqueTestName(testInfo);
+ this.groupProtocol = groupProtocol;
kafkaStreams = new KafkaStreams(builder.build(),
streamsConfiguration(safeTestName));
kafkaStreams.cleanUp();
@@ -446,7 +467,7 @@ public class IQv2IntegrationTest {
private Properties streamsConfiguration(final String safeTestName) {
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName
+ "-" + groupProtocol);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" +
(++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
config.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
@@ -458,6 +479,14 @@ public class IQv2IntegrationTest {
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+ config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
return config;
}
+
+ private static Stream<Arguments> groupProtocolParameters() {
+ return Stream.of(
+ Arguments.of("classic", "CLASSIC protocol"),
+ Arguments.of("streams", "STREAMS protocol")
+ );
+ }
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 963183fd665..649799c7976 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -360,7 +360,9 @@ public class IQv2StoreIntegrationTest {
for (final boolean logEnabled : Arrays.asList(true, false)) {
for (final StoresToTest toTest : StoresToTest.values()) {
for (final String kind : Arrays.asList("DSL", "PAPI")) {
- values.add(Arguments.of(cacheEnabled, logEnabled,
toTest.name(), kind));
+ for (final String groupProtocol :
Arrays.asList("classic", "streams")) {
+ values.add(Arguments.of(cacheEnabled, logEnabled,
toTest.name(), kind, groupProtocol));
+ }
}
}
}
@@ -426,13 +428,14 @@ public class IQv2StoreIntegrationTest {
));
}
- public void setup(final boolean cache, final boolean log, final
StoresToTest storeToTest, final String kind) {
+ public void setup(final boolean cache, final boolean log, final
StoresToTest storeToTest, final String kind, final String groupProtocol) {
final StoreSupplier<?> supplier = storeToTest.supplier();
final Properties streamsConfig = streamsConfiguration(
cache,
log,
storeToTest.name(),
- kind
+ kind,
+ groupProtocol
);
final StreamsBuilder builder = new StreamsBuilder();
@@ -765,8 +768,8 @@ public class IQv2StoreIntegrationTest {
@ParameterizedTest
@MethodSource("data")
- public void verifyStore(final boolean cache, final boolean log, final
StoresToTest storeToTest, final String kind) {
- setup(cache, log, storeToTest, kind);
+ public void verifyStore(final boolean cache, final boolean log, final
StoresToTest storeToTest, final String kind, final String groupProtocol) {
+ setup(cache, log, storeToTest, kind, groupProtocol);
try {
if (storeToTest.global()) {
// See KAFKA-13523
@@ -2030,10 +2033,10 @@ public class IQv2StoreIntegrationTest {
}
private static Properties streamsConfiguration(final boolean cache, final
boolean log,
- final String supplier,
final String kind) {
+ final String supplier,
final String kind, final String groupProtocol) {
final String safeTestName =
IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log
+ "-" + supplier
- + "-" + kind + "-" + RANDOM.nextInt();
+ + "-" + kind + "-" + groupProtocol + "-" + RANDOM.nextInt();
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
@@ -2048,6 +2051,7 @@ public class IQv2StoreIntegrationTest {
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+ config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
return config;
}
}
\ No newline at end of file
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
index 26a5aed5a52..bb1f3c20a87 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
@@ -47,7 +47,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.time.Instant;
@@ -57,7 +60,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
+import java.util.stream.Stream;
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -83,16 +88,25 @@ public class IQv2VersionedStoreIntegrationTest {
private static final Long[] RECORD_TIMESTAMPS = {BASE_TIMESTAMP_LONG,
BASE_TIMESTAMP_LONG + 10, BASE_TIMESTAMP_LONG + 20, BASE_TIMESTAMP_LONG + 30};
private static final int RECORD_NUMBER = RECORD_VALUES.length;
private static final int LAST_INDEX = RECORD_NUMBER - 1;
- private static final Position INPUT_POSITION = Position.emptyPosition();
+ private Position inputPosition;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable",
"true")));
private KafkaStreams kafkaStreams;
+ private String groupProtocol;
@BeforeAll
- public static void before() throws Exception {
+ public static void beforeAll() throws Exception {
CLUSTER.start();
-
+ }
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ // Delete and recreate the topic to ensure clean state for each test
+ CLUSTER.deleteTopic(INPUT_TOPIC_NAME);
+ CLUSTER.createTopic(INPUT_TOPIC_NAME, 1, 1);
+
+ // Set up fresh test data
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
IntegerSerializer.class);
@@ -103,19 +117,21 @@ public class IQv2VersionedStoreIntegrationTest {
producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,
RECORD_TIMESTAMPS[2], RECORD_KEY, RECORD_VALUES[2])).get();
producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,
RECORD_TIMESTAMPS[3], RECORD_KEY, RECORD_VALUES[3])).get();
}
- INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3);
+ inputPosition =
Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 3);
}
- @BeforeEach
- public void beforeTest() {
+ private void setup(final String groupProtocol, final TestInfo testInfo) {
+ this.groupProtocol = groupProtocol;
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME,
Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME,
HISTORY_RETENTION, SEGMENT_INTERVAL)));
final Properties configs = new Properties();
- configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
+ final String safeTestName = safeUniqueTestName(testInfo);
+ configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class.getName());
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class.getName());
+ configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
kafkaStreams = IntegrationTestUtils.getStartedStreams(configs,
builder, true);
}
@@ -132,8 +148,19 @@ public class IQv2VersionedStoreIntegrationTest {
CLUSTER.stop();
}
- @Test
- public void verifyStore() {
+ private static Stream<Arguments> groupProtocolParameters() {
+ return Stream.of(
+ Arguments.of("classic", "CLASSIC protocol"),
+ Arguments.of("streams", "STREAMS protocol")
+ );
+ }
+
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("groupProtocolParameters")
+ public void verifyStore(final String groupProtocol, final String testName,
final TestInfo testInfo) throws Exception {
+ // Set up streams
+ setup(groupProtocol, testInfo);
+
/* Test Versioned Key Queries */
// retrieve the latest value
shouldHandleVersionedKeyQuery(Optional.empty(), RECORD_VALUES[3],
RECORD_TIMESTAMPS[3], Optional.empty());
@@ -255,7 +282,10 @@ public class IQv2VersionedStoreIntegrationTest {
private void shouldHandleRaceCondition() {
final MultiVersionedKeyQuery<Integer, Integer> query =
defineQuery(RECORD_KEY, Optional.empty(), Optional.empty(), ResultOrder.ANY);
- final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>>
partitionResults = sendRequestAndReceiveResults(query, kafkaStreams);
+ // For race condition test, we don't use position bounds since we're
testing concurrent updates
+ final StateQueryRequest<VersionedRecordIterator<Integer>> request =
StateQueryRequest.inStore(STORE_NAME).withQuery(query);
+ final StateQueryResult<VersionedRecordIterator<Integer>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+ final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>>
partitionResults = result.getPartitionResults();
// verify results in two steps
for (final Entry<Integer,
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry :
partitionResults.entrySet()) {
@@ -327,14 +357,14 @@ public class IQv2VersionedStoreIntegrationTest {
return query;
}
- private static Map<Integer, QueryResult<VersionedRecordIterator<Integer>>>
sendRequestAndReceiveResults(final MultiVersionedKeyQuery<Integer, Integer>
query, final KafkaStreams kafkaStreams) {
- final StateQueryRequest<VersionedRecordIterator<Integer>> request =
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+ private Map<Integer, QueryResult<VersionedRecordIterator<Integer>>>
sendRequestAndReceiveResults(final MultiVersionedKeyQuery<Integer, Integer>
query, final KafkaStreams kafkaStreams) {
+ final StateQueryRequest<VersionedRecordIterator<Integer>> request =
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(inputPosition));
final StateQueryResult<VersionedRecordIterator<Integer>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
return result.getPartitionResults();
}
- private static QueryResult<VersionedRecord<Integer>>
sendRequestAndReceiveResults(final VersionedKeyQuery<Integer, Integer> query,
final KafkaStreams kafkaStreams) {
- final StateQueryRequest<VersionedRecord<Integer>> request =
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+ private QueryResult<VersionedRecord<Integer>>
sendRequestAndReceiveResults(final VersionedKeyQuery<Integer, Integer> query,
final KafkaStreams kafkaStreams) {
+ final StateQueryRequest<VersionedRecord<Integer>> request =
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(inputPosition));
final StateQueryResult<VersionedRecord<Integer>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
return result.getOnlyPartitionResult();
}
@@ -352,7 +382,7 @@ public class IQv2VersionedStoreIntegrationTest {
/**
* This method inserts a new value (999999) for the key in the oldest
timestamp (RECORD_TIMESTAMPS[0]).
*/
- private static void updateRecordValue() {
+ private void updateRecordValue() {
// update the record value at RECORD_TIMESTAMPS[0]
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
@@ -361,8 +391,9 @@ public class IQv2VersionedStoreIntegrationTest {
try (final KafkaProducer<Integer, Integer> producer = new
KafkaProducer<>(producerProps)) {
producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,
RECORD_TIMESTAMPS[0], RECORD_KEY, 999999));
}
- INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4);
- assertThat(INPUT_POSITION,
equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4)));
+
+ inputPosition = inputPosition.withComponent(INPUT_TOPIC_NAME, 0, 4);
+ assertThat(inputPosition,
equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4)));
// make sure that the new value is picked up by the store
final Properties consumerProps = new Properties();