vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1143766313


##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
                 .withPartitions(Collections.singleton(0));
         final StateQueryResult<String> result =
             IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+        assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+    }
+
+    @Test
+    public void shouldCreateGlobalTable() throws Exception {
+        // produce data to global store topic and track in-memory for 
processor to verify
+        final DataTracker data = new DataTracker();
+        produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .globalTable(
+                globalTableTopic,
+                Consumed.with(Serdes.Integer(), Serdes.String()),
+                Materialized
+                    .<Integer, String>as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+                    .withKeySerde(Serdes.Integer())
+                    .withValueSerde(Serdes.String())
+            );
+        streamsBuilder
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new VersionedStoreContentCheckerProcessor(false, 
data))
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data to trigger store verifications in processor
+        int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp 
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(receivedRecord.value, equalTo(0));

Review Comment:
   > I was referring to this comment: 
https://github.com/apache/kafka/pull/13340#discussion_r1128550162
   
   I see. My previous comment was about whether we can have a single processor 
write to the store and also read from it in order to verify its contents after 
writing. While it is possible to write to a global store from a processor (via 
`addGlobalStore()`), it is not possible to have that processor also write 
verification results downstream (the signature of that processor always returns 
`void` from its `process()` method). So no matter what we need to separate the 
verification logic from the logic that writes to the store. That's what I was 
trying to say earlier, but in an abbreviated/confusing way.
   
   Sounds like we're on the same page now regarding what the processor which 
reads from the store (and writes verification results downstream) is doing, so 
we should be good 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to