Copilot commented on code in PR #20541:
URL: https://github.com/apache/kafka/pull/20541#discussion_r2354603503


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -103,19 +115,21 @@ public static void before() throws Exception {
             producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,  
RECORD_TIMESTAMPS[2], RECORD_KEY, RECORD_VALUES[2])).get();
             producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0,  
RECORD_TIMESTAMPS[3], RECORD_KEY, RECORD_VALUES[3])).get();
         }
-        INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3);
+        inputPosition = Position.emptyPosition();
+        inputPosition = inputPosition.withComponent(INPUT_TOPIC_NAME, 0, 3);
     }
 
-    @BeforeEach
-    public void beforeTest() {
+    private void setup(final String groupProtocol) {
+        this.groupProtocol = groupProtocol;
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME,
             
Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, 
HISTORY_RETENTION, SEGMENT_INTERVAL)));
         final Properties configs = new Properties();
-        configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
+        configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
System.nanoTime() + "-" + groupProtocol);

Review Comment:
   Using System.nanoTime() for APPLICATION_ID_CONFIG is not recommended as it 
can result in non-deterministic behavior and potential collisions. Consider 
using a more deterministic approach like combining test name with group 
protocol.
   ```suggestion
           configs.put(StreamsConfig.APPLICATION_ID_CONFIG, 
getClass().getSimpleName() + "-" + groupProtocol);
   ```



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java:
##########
@@ -423,10 +441,13 @@ public String metricsScope() {
         );
 
         // Discard the basic streams and replace with test-specific topology
-        kafkaStreams.close();
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
         final String safeTestName = safeUniqueTestName(testInfo);
         kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration(safeTestName));
         kafkaStreams.cleanUp();
+        this.groupProtocol = groupProtocol;

Review Comment:
   The groupProtocol is being set after kafkaStreams is created and cleaned up, 
but the streamsConfiguration method that uses this.groupProtocol is called 
during kafkaStreams creation on line 448. This means the groupProtocol will be 
null during configuration, causing incorrect behavior.
   ```suggestion
           this.groupProtocol = groupProtocol;
           kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration(safeTestName));
           kafkaStreams.cleanUp();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to