vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1136261694
##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -302,7 +319,54 @@ public void
shouldAllowCustomIQv2ForCustomStoreImplementations() {
.withPartitions(Collections.singleton(0));
final StateQueryResult<String> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
- assertThat("success",
equalTo(result.getOnlyPartitionResult().getResult()));
+ assertThat(result.getOnlyPartitionResult().getResult(),
equalTo("success"));
+ }
+
+ @Test
+ public void shouldCreateGlobalTable() throws Exception {
+ // produce data to global store topic and track in-memory for
processor to verify
+ final DataTracker data = new DataTracker();
+ produceDataToTopic(globalTableTopic, data, baseTimestamp,
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+ produceDataToTopic(globalTableTopic, data, baseTimestamp + 5,
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+ produceDataToTopic(globalTableTopic, data, baseTimestamp + 2,
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); //
out-of-order data
+
+ // build topology and start app
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder
+ .globalTable(
+ globalTableTopic,
+ Consumed.with(Serdes.Integer(), Serdes.String()),
+ Materialized
+ .<Integer, String>as(new
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+ .withKeySerde(Serdes.Integer())
+ .withValueSerde(Serdes.String())
+ );
+ streamsBuilder
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new VersionedStoreContentCheckerProcessor(false,
data))
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce source data to trigger store verifications in processor
+ int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+ // wait for output and verify
+ final List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ numRecordsProduced);
+
+ for (final KeyValue<Integer, Integer> receivedRecord :
receivedRecords) {
+ // verify zero failed checks for each record
+ assertThat(receivedRecord.value, equalTo(0));
Review Comment:
> In your original comment, you say we cannot get the data from
global-ktable because we cannot inject a Processor
Hm, not sure which comment this refers to. Maybe the comment I made earlier
was about not being able to issue IQ against versioned stores? Ideally the way
these tests would be written would be to use IQ to check the contents of the
stores directly, but because versioned stores don't support that (yet) that's
why the tests inspect the contents of the stores with a processor. The
processor writes the number of errors to an output stream and the test
validates that the output stream contains only zeros (indicating no errors).
> we could use addGlobalStore instead of globalTable to add a Processor.
This test already has a processor which inspects/validates the contents of
the global store. Have I misunderstood?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]