mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1416297822


##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -88,7 +87,7 @@ public class IQv2VersionedStoreIntegrationTest {
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS, 
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"true")));
 
-    private KafkaStreams kafkaStreams;
+    private static KafkaStreams kafkaStreams;

Review Comment:
   Not sure if it's ok to make this one `static`? It's setup inside `@Before` 
which is not static and we run test in parallel...
   
   I think we need to rather pass it as a parameter into the newly added 
`static` methods that use it.



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -168,16 +167,13 @@ private void shouldHandleVersionedKeyQuery(final 
Optional<Instant> queryTimestam
                                                final Long expectedTimestamp,
                                                final Optional<Long> 
expectedValidToTime) {
 
-        VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(RECORD_KEY);
-        if (queryTimestamp.isPresent()) {
-            query = query.asOf(queryTimestamp.get());
-        }
-
-        final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
-        final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        // define query

Review Comment:
   Comment can be removed. We call `defineQuery()` -- very obvious what is 
happening (what also means, it's well structured and easy to understand code)



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -57,7 +57,7 @@ private MultiVersionedKeyQuery(final K key, final 
Optional<Instant> fromTime, fi
    * @param key The specified key by the query
    * @param <K> The type of the key
    * @param <V> The type of the value that will be retrieved
-   * @throws NullPointerException if @param key is null
+   * @throws NullPointerException if @code key is null

Review Comment:
   This need to be in `{}` -> `{@code key}`



##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -168,16 +167,13 @@ private void shouldHandleVersionedKeyQuery(final 
Optional<Instant> queryTimestam
                                                final Long expectedTimestamp,
                                                final Optional<Long> 
expectedValidToTime) {
 
-        VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(RECORD_KEY);
-        if (queryTimestamp.isPresent()) {
-            query = query.asOf(queryTimestamp.get());
-        }
-
-        final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
-        final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        // define query
+        final VersionedKeyQuery<Integer, Integer> query = 
defineQuery(RECORD_KEY, queryTimestamp);
 
-        final QueryResult<VersionedRecord<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        // send request and receive results

Review Comment:
   as above (more of the same below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to