vcrfxia commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1143766313
########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -302,7 +319,54 @@ public void shouldAllowCustomIQv2ForCustomStoreImplementations() { .withPartitions(Collections.singleton(0)); final StateQueryResult<String> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); - assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); + assertThat(result.getOnlyPartitionResult().getResult(), equalTo("success")); + } + + @Test + public void shouldCreateGlobalTable() throws Exception { + // produce data to global store topic and track in-memory for processor to verify + final DataTracker data = new DataTracker(); + produceDataToTopic(globalTableTopic, data, baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .globalTable( + globalTableTopic, + Consumed.with(Serdes.Integer(), Serdes.String()), + Materialized + .<Integer, String>as(new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION)) + .withKeySerde(Serdes.Integer()) + .withValueSerde(Serdes.String()) + ); + streamsBuilder + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(() -> new VersionedStoreContentCheckerProcessor(false, data)) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data to trigger store verifications in processor + int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp + 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(receivedRecord.value, equalTo(0)); Review Comment: > I was referring to this comment: https://github.com/apache/kafka/pull/13340#discussion_r1128550162 I see. My previous comment was about whether we can have a single processor write to the store and also read from it in order to verify its contents after writing. While it is possible to write to a global store from a processor (via `addGlobalStore()`), it is not possible to have that processor also write verification results downstream (the signature of that processor always returns `void` from its `process()` method). So no matter what we need to separate the verification logic from the logic that writes to the store. That's what I was trying to say earlier, but in an abbreviated/confusing way. Sounds like we're on the same page now regarding what the processor which reads from the store (and writes verification results downstream) is doing, so we should be good 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org