mjsax commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1142848310


##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -334,18 +399,65 @@ private final int produceSourceData(final long timestamp,
     }
 
     /**
-     * Test-only processor for inserting records into a versioned store while 
also tracking
-     * them separately in-memory, and performing checks to validate expected 
store contents.
-     * Forwards the number of failed checks downstream for consumption.
+     * @param topic       topic to produce to
+     * @param dataTracker map of key -> timestamp -> value for tracking data 
which is produced to
+     *                    the topic. This method will add the produced data 
into this in-memory
+     *                    tracker in addition to producing to the topic, in 
order to keep the two
+     *                    in sync.
+     * @param timestamp   timestamp to produce with
+     * @param keyValues   key-value pairs to produce
+     *
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceDataToTopic(final String topic,
+                                         final DataTracker dataTracker,
+                                         final long timestamp,
+                                         final KeyValue<Integer, String>... 
keyValues) {
+        produceDataToTopic(topic, timestamp, keyValues);
+
+        for (final KeyValue<Integer, String> keyValue : keyValues) {
+            dataTracker.add(keyValue.key, timestamp, keyValue.value);
+        }
+
+        return keyValues.length;
+    }
+
+    /**
+     * Test-only processor for validating expected contents of a versioned 
store, and forwards
+     * the number of failed checks downstream for consumption. Callers specify 
whether the
+     * processor should also be responsible for inserting records into the 
store (while also
+     * tracking them separately in-memory for use in validation).
      */
     private static class VersionedStoreContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
 
         private ProcessorContext<Integer, Integer> context;
         private VersionedKeyValueStore<Integer, String> store;
 
+        // whether or not the processor should write records to the store as 
they arrive.
+        // must be false for global stores.

Review Comment:
   Know that I understand how the test actually works, it makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to