mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1416208410
########## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ########## @@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer key, final VersionedRecord<Integer> result1 = queryResult.getResult(); assertThat(result1.value(), is(expectedValue)); assertThat(result1.timestamp(), is(expectedTimestamp)); + assertThat(result1.validTo(), is(expectedValidToTime)); assertThat(queryResult.getExecutionInfo(), is(empty())); } - private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) { + private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, final Instant queryTimestamp) { VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKey(key); query = query.asOf(queryTimestamp); final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); assertThat(result.getOnlyPartitionResult(), nullValue()); } + + private void shouldHandleMultiVersionedKeyQuery(final Optional<Instant> fromTime, final Optional<Instant> toTime, + final ResultOrder order, final int expectedArrayLowerBound, final int expectedArrayUpperBound) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + if (order.equals(ResultOrder.ASCENDING)) { + query = query.withAscendingTimestamps(); + } + + // send request and get the results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = order.equals(ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound; + int iteratorSize = 0; + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1; + iteratorSize++; + } + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(expectedArrayUpperBound - expectedArrayLowerBound + 1)); + } + } + } + + private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer key, final Optional<Instant> fromTime, final Optional<Instant> toTime) { + // define query + MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(key); + if (fromTime.isPresent()) { + query = query.fromTime(fromTime.get()); + } + if (toTime.isPresent()) { + query = query.toTime(toTime.get()); + } + + // send query and receive results + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + assertFalse(iterator.hasNext()); + } + } + } + + private void shouldHandleRaceCondition() { + final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(RECORD_KEY); + + final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final QueryResult<VersionedRecordIterator<Integer>> queryResult = result.getOnlyPartitionResult(); + if (queryResult.isFailure()) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); + + // verify results in two steps + final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults(); + for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) { + try (final VersionedRecordIterator<Integer> iterator = partitionResultsEntry.getValue().getResult()) { + int i = LAST_INDEX; + int iteratorSize = 0; + + // step 1: + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i--; + iteratorSize++; + if (i == 2) + break; + } + + // update the value of the oldest record + updateRecordValue(); + + // step 2: continue reading records from through the already opened iterator + while (iterator.hasNext()) { + final VersionedRecord<Integer> record = iterator.next(); + final Long timestamp = record.timestamp(); + final Optional<Long> validTo = record.validTo(); + final Integer value = record.value(); + + final Optional<Long> expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty(); + assertThat(value, is(RECORD_VALUES[i])); + assertThat(timestamp, is(RECORD_TIMESTAMPS[i])); + assertThat(validTo, is(expectedValidTo)); + assertThat(queryResult.getExecutionInfo(), is(empty())); + i--; + iteratorSize++; + } + + // The number of returned records by query is equal to expected number of records + assertThat(iteratorSize, equalTo(RECORD_NUMBER)); + } + } + } + + private void updateRecordValue() { + // update the record value at RECORD_TIMESTAMPS[0] + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + try (final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) { + producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], RECORD_KEY, 999999)); Review Comment: Ah. Thanks. Might be worth to add a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org