mjsax commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1142848310
########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -334,18 +399,65 @@ private final int produceSourceData(final long timestamp, } /** - * Test-only processor for inserting records into a versioned store while also tracking - * them separately in-memory, and performing checks to validate expected store contents. - * Forwards the number of failed checks downstream for consumption. + * @param topic topic to produce to + * @param dataTracker map of key -> timestamp -> value for tracking data which is produced to + * the topic. This method will add the produced data into this in-memory + * tracker in addition to producing to the topic, in order to keep the two + * in sync. + * @param timestamp timestamp to produce with + * @param keyValues key-value pairs to produce + * + * @return number of records produced + */ + @SuppressWarnings("varargs") + @SafeVarargs + private final int produceDataToTopic(final String topic, + final DataTracker dataTracker, + final long timestamp, + final KeyValue<Integer, String>... keyValues) { + produceDataToTopic(topic, timestamp, keyValues); + + for (final KeyValue<Integer, String> keyValue : keyValues) { + dataTracker.add(keyValue.key, timestamp, keyValue.value); + } + + return keyValues.length; + } + + /** + * Test-only processor for validating expected contents of a versioned store, and forwards + * the number of failed checks downstream for consumption. Callers specify whether the + * processor should also be responsible for inserting records into the store (while also + * tracking them separately in-memory for use in validation). */ private static class VersionedStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> { private ProcessorContext<Integer, Integer> context; private VersionedKeyValueStore<Integer, String> store; + // whether or not the processor should write records to the store as they arrive. + // must be false for global stores. Review Comment: Know that I understand how the test actually works, it makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org