Copilot commented on code in PR #20541:
URL: https://github.com/apache/kafka/pull/20541#discussion_r2354603503
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -103,19 +115,21 @@ public static void before() throws Exception {
producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,
RECORD_TIMESTAMPS[2], RECORD_KEY, RECORD_VALUES[2])).get();
producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,
RECORD_TIMESTAMPS[3], RECORD_KEY, RECORD_VALUES[3])).get();
}
- INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3);
+ inputPosition = Position.emptyPosition();
+ inputPosition = inputPosition.withComponent(INPUT_TOPIC_NAME, 0, 3);
}
- @BeforeEach
- public void beforeTest() {
+ private void setup(final String groupProtocol) {
+ this.groupProtocol = groupProtocol;
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME,
Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME,
HISTORY_RETENTION, SEGMENT_INTERVAL)));
final Properties configs = new Properties();
- configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
+ configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
System.nanoTime() + "-" + groupProtocol);
Review Comment:
Using System.nanoTime() for APPLICATION_ID_CONFIG is not recommended as it
can result in non-deterministic behavior and potential collisions. Consider
using a more deterministic approach like combining test name with group
protocol.
```suggestion
configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
getClass().getSimpleName() + "-" + groupProtocol);
```
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java:
##########
@@ -423,10 +441,13 @@ public String metricsScope() {
);
// Discard the basic streams and replace with test-specific topology
- kafkaStreams.close();
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ }
final String safeTestName = safeUniqueTestName(testInfo);
kafkaStreams = new KafkaStreams(builder.build(),
streamsConfiguration(safeTestName));
kafkaStreams.cleanUp();
+ this.groupProtocol = groupProtocol;
Review Comment:
The groupProtocol is being set after kafkaStreams is created and cleaned up,
but the streamsConfiguration method that uses this.groupProtocol is called
during kafkaStreams creation on line 448. This means the groupProtocol will be
null during configuration, causing incorrect behavior.
```suggestion
this.groupProtocol = groupProtocol;
kafkaStreams = new KafkaStreams(builder.build(),
streamsConfiguration(safeTestName));
kafkaStreams.cleanUp();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]