mjsax commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1142848103


##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
                 .withPartitions(Collections.singleton(0));
         final StateQueryResult<String> result =
             IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+        assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+    }
+
+    @Test
+    public void shouldCreateGlobalTable() throws Exception {
+        // produce data to global store topic and track in-memory for 
processor to verify
+        final DataTracker data = new DataTracker();
+        produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .globalTable(
+                globalTableTopic,
+                Consumed.with(Serdes.Integer(), Serdes.String()),
+                Materialized
+                    .<Integer, String>as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+                    .withKeySerde(Serdes.Integer())
+                    .withValueSerde(Serdes.String())
+            );
+        streamsBuilder
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new VersionedStoreContentCheckerProcessor(false, 
data))
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data to trigger store verifications in processor
+        int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp 
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(receivedRecord.value, equalTo(0));

Review Comment:
   I was referring to this comment: 
https://github.com/apache/kafka/pull/13340#discussion_r1128550162
   
   > and we will not be able to write to a global store from the processor
   
   If you specify a global-store, we pass in the "global processor" that is 
able to write into the store (well, has to do this, to maintain the global 
store), and thus, we can easily track what goes into the store "on the side" is 
an in-memory data structure similar to what we do for a regular processor that 
maintains the store.
   
   > This test already has a processor which inspects/validates the contents of 
the global store. Have I misunderstood?
   
   I think I did not understand how the test works -- not I see that you use a 
regular processor to read the global state store to verify the content. So I 
guess my comment is void (I did basically propose to add this via a "global 
processor").



##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
                 .withPartitions(Collections.singleton(0));
         final StateQueryResult<String> result =
             IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+        assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+    }
+
+    @Test
+    public void shouldCreateGlobalTable() throws Exception {
+        // produce data to global store topic and track in-memory for 
processor to verify
+        final DataTracker data = new DataTracker();
+        produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .globalTable(
+                globalTableTopic,
+                Consumed.with(Serdes.Integer(), Serdes.String()),
+                Materialized
+                    .<Integer, String>as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+                    .withKeySerde(Serdes.Integer())
+                    .withValueSerde(Serdes.String())
+            );
+        streamsBuilder
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new VersionedStoreContentCheckerProcessor(false, 
data))
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data to trigger store verifications in processor
+        int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp 
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(receivedRecord.value, equalTo(0));

Review Comment:
   I was referring to this comment: 
https://github.com/apache/kafka/pull/13340#discussion_r1128550162
   
   > and we will not be able to write to a global store from the processor
   
   If you specify a global-store, we pass in the "global processor" that is 
able to write into the store (well, has to do this, to maintain the global 
store), and thus, we can easily track what goes into the store "on the side" is 
an in-memory data structure similar to what we do for a regular processor that 
maintains the store.
   
   > This test already has a processor which inspects/validates the contents of 
the global store. Have I misunderstood?
   
   I think I did not understand how the test works -- not I see that you use a 
regular processor to read the global state store to verify the content. So I 
guess my comment is void (I did basically propose to add this via a "global 
processor").



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to