vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1136261694


##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
                 .withPartitions(Collections.singleton(0));
         final StateQueryResult<String> result =
             IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+        assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+    }
+
+    @Test
+    public void shouldCreateGlobalTable() throws Exception {
+        // produce data to global store topic and track in-memory for 
processor to verify
+        final DataTracker data = new DataTracker();
+        produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .globalTable(
+                globalTableTopic,
+                Consumed.with(Serdes.Integer(), Serdes.String()),
+                Materialized
+                    .<Integer, String>as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+                    .withKeySerde(Serdes.Integer())
+                    .withValueSerde(Serdes.String())
+            );
+        streamsBuilder
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new VersionedStoreContentCheckerProcessor(false, 
data))
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data to trigger store verifications in processor
+        int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp 
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(receivedRecord.value, equalTo(0));

Review Comment:
   > In your original comment, you say we cannot get the data from 
global-ktable because we cannot inject a Processor
   
   Hm, not sure which comment this refers to. Maybe the comment I made earlier 
was about not being able to issue IQ against versioned stores? Ideally the way 
these tests would be written would be to use IQ to check the contents of the 
stores directly, but because versioned stores don't support that (yet) that's 
why the tests inspect the contents of the stores with a processor. The 
processor writes the number of errors to an output stream and the test 
validates that the output stream contains only zeros (indicating no errors).
   
   > we could use addGlobalStore instead of globalTable to add a Processor.
   
   This test already has a processor which inspects/validates the contents of 
the global store. Have I misunderstood?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to