aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1415599211


##########
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##########
@@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer 
key,
         final VersionedRecord<Integer> result1 = queryResult.getResult();
         assertThat(result1.value(), is(expectedValue));
         assertThat(result1.timestamp(), is(expectedTimestamp));
+        assertThat(result1.validTo(), is(expectedValidToTime));
         assertThat(queryResult.getExecutionInfo(), is(empty()));
     }
 
-    private void shouldVerifyGetNull(final Integer key, final Instant 
queryTimestamp) {
+    private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, 
final Instant queryTimestamp) {
         VersionedKeyQuery<Integer, Integer> query = 
VersionedKeyQuery.withKey(key);
         query = query.asOf(queryTimestamp);
         final StateQueryRequest<VersionedRecord<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
         final StateQueryResult<VersionedRecord<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
         assertThat(result.getOnlyPartitionResult(), nullValue());
     }
+
+    private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> 
fromTime, final Optional<Instant> toTime,
+                                                    final ResultOrder order, 
final int expectedArrayLowerBound, final int expectedArrayUpperBound) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            query = query.withAscendingTimestamps();
+        }
+
+        // send request and get the results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = order.equals(ResultOrder.ASCENDING) ? 0 : 
expectedArrayUpperBound;
+                int iteratorSize = 0;
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < 
expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : 
Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1;
+                    iteratorSize++;
+                }
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(expectedArrayUpperBound - 
expectedArrayLowerBound + 1));
+            }
+        }
+    }
+
+    private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer 
key, final Optional<Instant> fromTime, final Optional<Instant> toTime) {
+        // define query
+        MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(key);
+        if (fromTime.isPresent()) {
+            query = query.fromTime(fromTime.get());
+        }
+        if (toTime.isPresent()) {
+            query = query.toTime(toTime.get());
+        }
+
+        // send query and receive results
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                assertFalse(iterator.hasNext());
+            }
+        }
+    }
+
+    private void shouldHandleRaceCondition() {
+        final MultiVersionedKeyQuery<Integer, Integer> query = 
MultiVersionedKeyQuery.withKey(RECORD_KEY);
+
+        final StateQueryRequest<VersionedRecordIterator<Integer>> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+        final StateQueryResult<VersionedRecordIterator<Integer>> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        final QueryResult<VersionedRecordIterator<Integer>> queryResult = 
result.getOnlyPartitionResult();
+        if (queryResult.isFailure()) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureMessage);
+
+        // verify results in two steps
+        final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> 
partitionResults = result.getPartitionResults();
+        for (final Entry<Integer, 
QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : 
partitionResults.entrySet()) {
+            try (final VersionedRecordIterator<Integer> iterator = 
partitionResultsEntry.getValue().getResult()) {
+                int i = LAST_INDEX;
+                int iteratorSize = 0;
+
+                // step 1:
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < LAST_INDEX ? 
Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i--;
+                    iteratorSize++;
+                    if (i == 2)
+                        break;
+                }
+
+                // update the value of the oldest record
+                updateRecordValue();
+
+                // step 2: continue reading records from through the already 
opened iterator
+                while (iterator.hasNext()) {
+                    final VersionedRecord<Integer> record = iterator.next();
+                    final Long timestamp = record.timestamp();
+                    final Optional<Long> validTo = record.validTo();
+                    final Integer value = record.value();
+
+                    final Optional<Long> expectedValidTo = i < LAST_INDEX ? 
Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+                    assertThat(value, is(RECORD_VALUES[i]));
+                    assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+                    assertThat(validTo, is(expectedValidTo));
+                    assertThat(queryResult.getExecutionInfo(), is(empty()));
+                    i--;
+                    iteratorSize++;
+                }
+
+                // The number of returned records by query is equal to 
expected number of records
+                assertThat(iteratorSize, equalTo(RECORD_NUMBER));
+            }
+        }
+    }
+
+    private void updateRecordValue() {
+        // update the record value at RECORD_TIMESTAMPS[0]
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        try (final KafkaProducer<Integer, Integer> producer = new 
KafkaProducer<>(producerProps)) {
+            producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMPS[0], RECORD_KEY, 999999));

Review Comment:
   > Why do we re-use `RECORD_TIMESTAMPS[0]` here? Not sure if I can follow? -- 
Won't this overwrite the first/oldest value which was already returned in the 
first `while` loop before `updateRecordValue` is executed?
   
   Yes, it does (it overwrites the oldest value). That 's why, when not using 
snapshots, the test will fail because it expects the new value (`999999` 
instead of the old value `RECORD_TVALUES[0])



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to