vcrfxia commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1128550162
########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + + private static final String STORE_NAME = "versioned-store"; + private static final long HISTORY_RETENTION = 3600_000L; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Before + public void beforeTest() throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testName); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @After + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutGetAndDelete() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int numRecordsProduced = 0; + numRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + numRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + numRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(0, equalTo(receivedRecord.value)); + } + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce record (and wait for result) to create changelog + produceSourceData(baseTimestamp, KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertThat(changelogTopicConfig.getProperty("cleanup.policy"), equalTo("compact")); + assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); + } + + @Test + public void shouldRestore() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int initialRecordsProduced = 0; + initialRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + initialRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + initialRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + initialRecordsProduced += produceSourceData(baseTimestamp + 10, KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // new data so latest is not tombstone + + // wait for output + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records + final int additionalRecordsProduced = produceSourceData(baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + for (int i = 1; i <= additionalRecordsProduced; i++) { + final KeyValue<Integer, Integer> receivedRecord = receivedRecords.get(receivedRecords.size() - i); + // verify more than one record version found, which confirms that restore took place + assertThat(1, lessThan(receivedRecord.value)); Review Comment: On second thought, I will make this change because I'd like to add a test for showing that versioned stores are eligible as global stores as well, and we will not be able to write to a global store from the processor so we'll have to pass in a separate dataset to verify in this case anyway. The code will be a good deal more complex but it will provide the stricter verifications that we prefer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org