mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1416297822
########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -88,7 +87,7 @@ public class IQv2VersionedStoreIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true"))); - private KafkaStreams kafkaStreams; + private static KafkaStreams kafkaStreams; Review Comment: Not sure if it's ok to make this one `static`? It's setup inside `@Before` which is not static and we run test in parallel... I think we need to rather pass it as a parameter into the newly added `static` methods that use it. ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -168,16 +167,13 @@ private void shouldHandleVersionedKeyQuery(final Optional<Instant> queryTimestam final Long expectedTimestamp, final Optional<Long> expectedValidToTime) { - VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(RECORD_KEY); - if (queryTimestamp.isPresent()) { - query = query.asOf(queryTimestamp.get()); - } - - final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); - final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + // define query Review Comment: Comment can be removed. We call `defineQuery()` -- very obvious what is happening (what also means, it's well structured and easy to understand code) ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -57,7 +57,7 @@ private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, fi * @param key The specified key by the query * @param <K> The type of the key * @param <V> The type of the value that will be retrieved - * @throws NullPointerException if @param key is null + * @throws NullPointerException if @code key is null Review Comment: This need to be in `{}` -> `{@code key}` ########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -168,16 +167,13 @@ private void shouldHandleVersionedKeyQuery(final Optional<Instant> queryTimestam final Long expectedTimestamp, final Optional<Long> expectedValidToTime) { - VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(RECORD_KEY); - if (queryTimestamp.isPresent()) { - query = query.asOf(queryTimestamp.get()); - } - - final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); - final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + // define query + final VersionedKeyQuery<Integer, Integer> query = defineQuery(RECORD_KEY, queryTimestamp); - final QueryResult<VersionedRecord<Integer>> queryResult = result.getOnlyPartitionResult(); + // send request and receive results Review Comment: as above (more of the same below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org