vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1127290672


##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+    private static final String STORE_NAME = "versioned-store";
+    private static final long HISTORY_RETENTION = 3600_000L;
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void beforeTest() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+    }
+
+    @After
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int numRecordsProduced = 0;
+        numRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        numRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        numRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(0, equalTo(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceSourceData(baseTimestamp, KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
"app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertThat(changelogTopicConfig.getProperty("cleanup.policy"), 
equalTo("compact"));
+        assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), 
equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L)));
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        initialRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+        initialRecordsProduced += produceSourceData(baseTimestamp + 10, 
KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // 
new data so latest is not tombstone
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records
+        final int additionalRecordsProduced = produceSourceData(baseTimestamp 
+ 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (int i = 1; i <= additionalRecordsProduced; i++) {
+            final KeyValue<Integer, Integer> receivedRecord = 
receivedRecords.get(receivedRecords.size() - i);
+            // verify more than one record version found, which confirms that 
restore took place
+            assertThat(1, lessThan(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldAllowCustomIQv2ForCustomStoreImplementations() {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new CustomIQv2VersionedStoreSupplier(),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // issue IQv2 query and verify result
+        final StateQueryRequest<String> request =
+            StateQueryRequest.inStore(STORE_NAME)
+                .withQuery(new TestQuery())
+                .withPartitions(Collections.singleton(0));
+        final StateQueryResult<String> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    /**
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceSourceData(final long timestamp,
+                                        final KeyValue<Integer, String>... 
keyValues) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            Arrays.asList(keyValues),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            timestamp);
+        return keyValues.length;
+    }
+
+    /**
+     * Test-only processor for inserting records into a versioned store while 
also tracking
+     * them separately in-memory, and performing checks to validate expected 
store contents.
+     * Forwards the number of failed checks downstream for consumption.
+     */
+    private static class VersionedStoreContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private VersionedKeyValueStore<Integer, String> store;
+
+        // in-memory copy of seen data, to validate for testing purposes.
+        // maps from key -> timestamp -> value
+        private final Map<Integer, Map<Long, String>> data = new HashMap<>();
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            // add record to store. special value "delete" is interpreted as a 
delete() call,
+            // in contrast to null value, which is a tombstone inserted via 
put()
+            if ("delete".equals(record.value())) {
+                store.delete(record.key(), record.timestamp());
+                addToSeenData(record.key(), record.timestamp(), null);
+            } else {
+                store.put(record.key(), record.value(), record.timestamp());
+                addToSeenData(record.key(), record.timestamp(), 
record.value());
+            }
+
+            // check expected contents of store, and signal completion by 
writing
+            // number of failures to downstream
+            final int failedChecks = checkStoreContents();
+            context.forward(record.withValue(failedChecks));
+        }
+
+        private void addToSeenData(final Integer key, final long timestamp, 
final String value) {
+            if (data.containsKey(key)) {
+                data.get(key).put(timestamp, value);
+            } else {
+                final Map<Long, String> timestampsAndValues = new HashMap<>();
+                timestampsAndValues.put(timestamp, value);
+                data.put(key, timestampsAndValues);
+            }
+        }
+
+        /**
+         * @return number of failed checks
+         */
+        private int checkStoreContents() {
+            int failedChecks = 0;
+            for (final Map.Entry<Integer, Map<Long, String>> 
keyWithTimestampsAndValues : data.entrySet()) {
+                final Integer key = keyWithTimestampsAndValues.getKey();
+                final Map<Long, String> timestampsAndValues = 
keyWithTimestampsAndValues.getValue();
+
+                // track largest timestamp seen for key
+                long maxTimestamp = -1L;
+                String expectedValueForMaxTimestamp = null;
+
+                for (final Map.Entry<Long, String> timestampAndValue : 
timestampsAndValues.entrySet()) {
+                    final Long timestamp = timestampAndValue.getKey();

Review Comment:
   Seems we could go either way on this one because in the context of 
`contentsMatch(...)` this is `expectedTimestamp`, but in the context of 
`store.get(...)` it's just `timestamp`? Went ahead and made the update 👍 



##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+    private static final String STORE_NAME = "versioned-store";
+    private static final long HISTORY_RETENTION = 3600_000L;
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void beforeTest() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+    }
+
+    @After
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int numRecordsProduced = 0;
+        numRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        numRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        numRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(0, equalTo(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceSourceData(baseTimestamp, KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
"app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertThat(changelogTopicConfig.getProperty("cleanup.policy"), 
equalTo("compact"));
+        assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), 
equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L)));
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        initialRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+        initialRecordsProduced += produceSourceData(baseTimestamp + 10, 
KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // 
new data so latest is not tombstone
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records
+        final int additionalRecordsProduced = produceSourceData(baseTimestamp 
+ 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (int i = 1; i <= additionalRecordsProduced; i++) {
+            final KeyValue<Integer, Integer> receivedRecord = 
receivedRecords.get(receivedRecords.size() - i);
+            // verify more than one record version found, which confirms that 
restore took place
+            assertThat(1, lessThan(receivedRecord.value));

Review Comment:
   I considered that when writing this test but the code for doing so would be 
rather complex -- we'd have to track all the added records, similar to what's 
done in `VersionedStoreContentCheckerProcessor`, except we can't do it in the 
processor anymore because the processor is recreated after the restart. So we'd 
have to track this data separately (in the body of the test itself) and then 
pass it back into the processor after the restart in order to have the 
processor perform the checks. 
   
   If you think it's worth the additional complexity, I can make the update.
   
   (As an aside, the reason these tests are already as complicated as they are 
is because versioned stores don't support interactive queries right now. 
Otherwise we could just query the store directly from outside the processor and 
not need to rely on these roundabout checks.)



##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+    private static final String STORE_NAME = "versioned-store";
+    private static final long HISTORY_RETENTION = 3600_000L;
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void beforeTest() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+    }
+
+    @After
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int numRecordsProduced = 0;
+        numRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        numRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        numRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(0, equalTo(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceSourceData(baseTimestamp, KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
"app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertThat(changelogTopicConfig.getProperty("cleanup.policy"), 
equalTo("compact"));
+        assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), 
equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L)));
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        initialRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+        initialRecordsProduced += produceSourceData(baseTimestamp + 10, 
KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // 
new data so latest is not tombstone
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records
+        final int additionalRecordsProduced = produceSourceData(baseTimestamp 
+ 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (int i = 1; i <= additionalRecordsProduced; i++) {
+            final KeyValue<Integer, Integer> receivedRecord = 
receivedRecords.get(receivedRecords.size() - i);
+            // verify more than one record version found, which confirms that 
restore took place
+            assertThat(1, lessThan(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldAllowCustomIQv2ForCustomStoreImplementations() {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new CustomIQv2VersionedStoreSupplier(),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // issue IQv2 query and verify result
+        final StateQueryRequest<String> request =
+            StateQueryRequest.inStore(STORE_NAME)
+                .withQuery(new TestQuery())
+                .withPartitions(Collections.singleton(0));
+        final StateQueryResult<String> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    /**
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceSourceData(final long timestamp,
+                                        final KeyValue<Integer, String>... 
keyValues) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            Arrays.asList(keyValues),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            timestamp);
+        return keyValues.length;
+    }
+
+    /**
+     * Test-only processor for inserting records into a versioned store while 
also tracking
+     * them separately in-memory, and performing checks to validate expected 
store contents.
+     * Forwards the number of failed checks downstream for consumption.
+     */
+    private static class VersionedStoreContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private VersionedKeyValueStore<Integer, String> store;
+
+        // in-memory copy of seen data, to validate for testing purposes.
+        // maps from key -> timestamp -> value
+        private final Map<Integer, Map<Long, String>> data = new HashMap<>();
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            // add record to store. special value "delete" is interpreted as a 
delete() call,
+            // in contrast to null value, which is a tombstone inserted via 
put()
+            if ("delete".equals(record.value())) {
+                store.delete(record.key(), record.timestamp());
+                addToSeenData(record.key(), record.timestamp(), null);
+            } else {
+                store.put(record.key(), record.value(), record.timestamp());
+                addToSeenData(record.key(), record.timestamp(), 
record.value());
+            }
+
+            // check expected contents of store, and signal completion by 
writing
+            // number of failures to downstream
+            final int failedChecks = checkStoreContents();
+            context.forward(record.withValue(failedChecks));
+        }
+
+        private void addToSeenData(final Integer key, final long timestamp, 
final String value) {
+            if (data.containsKey(key)) {
+                data.get(key).put(timestamp, value);
+            } else {
+                final Map<Long, String> timestampsAndValues = new HashMap<>();
+                timestampsAndValues.put(timestamp, value);
+                data.put(key, timestampsAndValues);
+            }
+        }
+
+        /**
+         * @return number of failed checks
+         */
+        private int checkStoreContents() {
+            int failedChecks = 0;
+            for (final Map.Entry<Integer, Map<Long, String>> 
keyWithTimestampsAndValues : data.entrySet()) {
+                final Integer key = keyWithTimestampsAndValues.getKey();
+                final Map<Long, String> timestampsAndValues = 
keyWithTimestampsAndValues.getValue();
+
+                // track largest timestamp seen for key
+                long maxTimestamp = -1L;
+                String expectedValueForMaxTimestamp = null;
+
+                for (final Map.Entry<Long, String> timestampAndValue : 
timestampsAndValues.entrySet()) {
+                    final Long timestamp = timestampAndValue.getKey();
+                    final String expectedValue = timestampAndValue.getValue();
+
+                    if (timestamp > maxTimestamp) {
+                        maxTimestamp = timestamp;
+                        expectedValueForMaxTimestamp = expectedValue;
+                    }
+
+                    // validate timestamped get on store
+                    final VersionedRecord<String> versionedRecord = 
store.get(key, timestamp);
+                    if (!contentsMatch(versionedRecord, expectedValue, 
timestamp)) {
+                        failedChecks++;
+                    }
+                }
+
+                // validate get latest on store
+                final VersionedRecord<String> versionedRecord = store.get(key);
+                if (!contentsMatch(versionedRecord, 
expectedValueForMaxTimestamp, maxTimestamp)) {
+                    failedChecks++;
+                }
+            }
+            return failedChecks;
+        }
+
+        private static boolean contentsMatch(final VersionedRecord<String> 
versionedRecord,
+                                      final String expectedValue,
+                                      final long expectedTimestamp) {
+            if (expectedValue == null) {
+                return versionedRecord == null;
+            } else {
+                if (versionedRecord == null) {
+                    return false;
+                }
+                return expectedValue.equals(versionedRecord.value())
+                    && expectedTimestamp == versionedRecord.timestamp();
+            }
+        }
+    }
+
+    /**
+     * Test-only processor for counting the number of record versions for a 
specific key,
+     * and forwards this count downstream for consumption. The count only 
includes record
+     * versions earlier than the current one, and stops as soon as a null is 
encountered.
+     */
+    private static class VersionedStoreCountProcessor implements 
Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private VersionedKeyValueStore<Integer, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            // add record to store. special value "delete" is interpreted as a 
delete() call,
+            // in contrast to null value, which is a tombstone inserted via 
put()
+            if ("delete".equals(record.value())) {
+                store.delete(record.key(), record.timestamp());
+            } else {
+                store.put(record.key(), record.value(), record.timestamp());
+            }
+
+            // count number of versions for this key, up through the current 
version.
+            // count stops as soon as a null is reached
+            int numVersions = 0;
+            long timestamp = record.timestamp();
+            while (true) {
+                final VersionedRecord<String> versionedRecord = 
store.get(record.key(), timestamp);
+                if (versionedRecord != null) {
+                    numVersions++;
+                    // skip forward from current record version to search for 
more

Review Comment:
   Yes... in my mind that was "forward" (i.e., earlier) but I see now why 
that's super confusing 😆 Rewrote this comment to say "earlier" instead of 
either "forward" or "backward."



##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+    private static final String STORE_NAME = "versioned-store";
+    private static final long HISTORY_RETENTION = 3600_000L;
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void beforeTest() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+    }
+
+    @After
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int numRecordsProduced = 0;
+        numRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        numRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        numRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(0, equalTo(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceSourceData(baseTimestamp, KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
"app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertThat(changelogTopicConfig.getProperty("cleanup.policy"), 
equalTo("compact"));
+        assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), 
equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L)));
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        initialRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+        initialRecordsProduced += produceSourceData(baseTimestamp + 10, 
KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // 
new data so latest is not tombstone
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records
+        final int additionalRecordsProduced = produceSourceData(baseTimestamp 
+ 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (int i = 1; i <= additionalRecordsProduced; i++) {
+            final KeyValue<Integer, Integer> receivedRecord = 
receivedRecords.get(receivedRecords.size() - i);
+            // verify more than one record version found, which confirms that 
restore took place
+            assertThat(1, lessThan(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldAllowCustomIQv2ForCustomStoreImplementations() {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new CustomIQv2VersionedStoreSupplier(),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // issue IQv2 query and verify result
+        final StateQueryRequest<String> request =
+            StateQueryRequest.inStore(STORE_NAME)
+                .withQuery(new TestQuery())
+                .withPartitions(Collections.singleton(0));
+        final StateQueryResult<String> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    /**
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceSourceData(final long timestamp,
+                                        final KeyValue<Integer, String>... 
keyValues) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            Arrays.asList(keyValues),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            timestamp);
+        return keyValues.length;
+    }
+
+    /**
+     * Test-only processor for inserting records into a versioned store while 
also tracking
+     * them separately in-memory, and performing checks to validate expected 
store contents.
+     * Forwards the number of failed checks downstream for consumption.
+     */
+    private static class VersionedStoreContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private VersionedKeyValueStore<Integer, String> store;
+
+        // in-memory copy of seen data, to validate for testing purposes.
+        // maps from key -> timestamp -> value
+        private final Map<Integer, Map<Long, String>> data = new HashMap<>();
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            // add record to store. special value "delete" is interpreted as a 
delete() call,
+            // in contrast to null value, which is a tombstone inserted via 
put()
+            if ("delete".equals(record.value())) {
+                store.delete(record.key(), record.timestamp());
+                addToSeenData(record.key(), record.timestamp(), null);
+            } else {
+                store.put(record.key(), record.value(), record.timestamp());
+                addToSeenData(record.key(), record.timestamp(), 
record.value());
+            }
+
+            // check expected contents of store, and signal completion by 
writing
+            // number of failures to downstream
+            final int failedChecks = checkStoreContents();
+            context.forward(record.withValue(failedChecks));
+        }
+
+        private void addToSeenData(final Integer key, final long timestamp, 
final String value) {
+            if (data.containsKey(key)) {
+                data.get(key).put(timestamp, value);
+            } else {
+                final Map<Long, String> timestampsAndValues = new HashMap<>();
+                timestampsAndValues.put(timestamp, value);
+                data.put(key, timestampsAndValues);
+            }
+        }
+
+        /**
+         * @return number of failed checks
+         */
+        private int checkStoreContents() {
+            int failedChecks = 0;
+            for (final Map.Entry<Integer, Map<Long, String>> 
keyWithTimestampsAndValues : data.entrySet()) {
+                final Integer key = keyWithTimestampsAndValues.getKey();
+                final Map<Long, String> timestampsAndValues = 
keyWithTimestampsAndValues.getValue();
+
+                // track largest timestamp seen for key
+                long maxTimestamp = -1L;
+                String expectedValueForMaxTimestamp = null;
+
+                for (final Map.Entry<Long, String> timestampAndValue : 
timestampsAndValues.entrySet()) {
+                    final Long timestamp = timestampAndValue.getKey();
+                    final String expectedValue = timestampAndValue.getValue();
+
+                    if (timestamp > maxTimestamp) {
+                        maxTimestamp = timestamp;
+                        expectedValueForMaxTimestamp = expectedValue;
+                    }
+
+                    // validate timestamped get on store
+                    final VersionedRecord<String> versionedRecord = 
store.get(key, timestamp);
+                    if (!contentsMatch(versionedRecord, expectedValue, 
timestamp)) {
+                        failedChecks++;
+                    }
+                }
+
+                // validate get latest on store
+                final VersionedRecord<String> versionedRecord = store.get(key);
+                if (!contentsMatch(versionedRecord, 
expectedValueForMaxTimestamp, maxTimestamp)) {
+                    failedChecks++;
+                }
+            }
+            return failedChecks;
+        }
+
+        private static boolean contentsMatch(final VersionedRecord<String> 
versionedRecord,
+                                      final String expectedValue,
+                                      final long expectedTimestamp) {
+            if (expectedValue == null) {
+                return versionedRecord == null;
+            } else {
+                if (versionedRecord == null) {
+                    return false;
+                }
+                return expectedValue.equals(versionedRecord.value())
+                    && expectedTimestamp == versionedRecord.timestamp();
+            }
+        }
+    }
+
+    /**
+     * Test-only processor for counting the number of record versions for a 
specific key,
+     * and forwards this count downstream for consumption. The count only 
includes record
+     * versions earlier than the current one, and stops as soon as a null is 
encountered.
+     */
+    private static class VersionedStoreCountProcessor implements 
Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private VersionedKeyValueStore<Integer, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            // add record to store. special value "delete" is interpreted as a 
delete() call,
+            // in contrast to null value, which is a tombstone inserted via 
put()
+            if ("delete".equals(record.value())) {
+                store.delete(record.key(), record.timestamp());
+            } else {
+                store.put(record.key(), record.value(), record.timestamp());
+            }
+
+            // count number of versions for this key, up through the current 
version.
+            // count stops as soon as a null is reached
+            int numVersions = 0;
+            long timestamp = record.timestamp();
+            while (true) {
+                final VersionedRecord<String> versionedRecord = 
store.get(record.key(), timestamp);

Review Comment:
   We could do that too. Either way it's not possible to get an accurate count 
of the total number of record versions for the key, since the search process 
stops as soon as a null is encountered (because nulls do not come with 
timestamps, which means we cannot continue to search for earlier records). So, 
this test processor isn't trying to do anything complete/comprehensive -- just 
enough to give us the checks that we need. 
   
   Given that `put/get/delete` is already tested more comprehensively in 
`shouldPutGetAndDelete()` and that this processor is only used for other tests, 
I'm inclined to leave it as is.



##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+    private static final String STORE_NAME = "versioned-store";
+    private static final long HISTORY_RETENTION = 3600_000L;
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void beforeTest() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+    }
+
+    @After
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int numRecordsProduced = 0;
+        numRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        numRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        numRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(0, equalTo(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceSourceData(baseTimestamp, KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
"app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertThat(changelogTopicConfig.getProperty("cleanup.policy"), 
equalTo("compact"));
+        assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), 
equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L)));
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        initialRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+        initialRecordsProduced += produceSourceData(baseTimestamp + 10, 
KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // 
new data so latest is not tombstone
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records
+        final int additionalRecordsProduced = produceSourceData(baseTimestamp 
+ 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (int i = 1; i <= additionalRecordsProduced; i++) {
+            final KeyValue<Integer, Integer> receivedRecord = 
receivedRecords.get(receivedRecords.size() - i);
+            // verify more than one record version found, which confirms that 
restore took place
+            assertThat(1, lessThan(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldAllowCustomIQv2ForCustomStoreImplementations() {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new CustomIQv2VersionedStoreSupplier(),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // issue IQv2 query and verify result
+        final StateQueryRequest<String> request =
+            StateQueryRequest.inStore(STORE_NAME)
+                .withQuery(new TestQuery())
+                .withPartitions(Collections.singleton(0));
+        final StateQueryResult<String> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));

Review Comment:
   > Wondering if we could use a custom query type to verify the store content 
in the restore test, directly after restoration?
   
   The restore test tests the actual RocksDB-based versioned store 
implementation, which does not support IQ. (Aside: that's also why I had to 
introduce a new versioned store type for this restore test -- once we support 
IQ in the RocksDB-based implementation itself, we can update this test to 
remove the extra code.)
   
   > Seems we only test if we don't crash?
   
   Also that we get the expected string back from the custom query type, but 
yes agreed that it's not a particularly exciting check. Another reason to 
update this test once the RocksDB-based implementation supports IQ.



##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+    private static final String STORE_NAME = "versioned-store";
+    private static final long HISTORY_RETENTION = 3600_000L;
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void beforeTest() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+    }
+
+    @After
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int numRecordsProduced = 0;
+        numRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        numRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        numRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(0, equalTo(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceSourceData(baseTimestamp, KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
"app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertThat(changelogTopicConfig.getProperty("cleanup.policy"), 
equalTo("compact"));
+        assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), 
equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L)));
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        initialRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+        initialRecordsProduced += produceSourceData(baseTimestamp + 10, 
KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // 
new data so latest is not tombstone
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records
+        final int additionalRecordsProduced = produceSourceData(baseTimestamp 
+ 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (int i = 1; i <= additionalRecordsProduced; i++) {
+            final KeyValue<Integer, Integer> receivedRecord = 
receivedRecords.get(receivedRecords.size() - i);
+            // verify more than one record version found, which confirms that 
restore took place
+            assertThat(1, lessThan(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldAllowCustomIQv2ForCustomStoreImplementations() {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new CustomIQv2VersionedStoreSupplier(),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // issue IQv2 query and verify result
+        final StateQueryRequest<String> request =
+            StateQueryRequest.inStore(STORE_NAME)
+                .withQuery(new TestQuery())
+                .withPartitions(Collections.singleton(0));
+        final StateQueryResult<String> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+        assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    /**
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceSourceData(final long timestamp,
+                                        final KeyValue<Integer, String>... 
keyValues) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            Arrays.asList(keyValues),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            timestamp);
+        return keyValues.length;
+    }
+
+    /**
+     * Test-only processor for inserting records into a versioned store while 
also tracking
+     * them separately in-memory, and performing checks to validate expected 
store contents.
+     * Forwards the number of failed checks downstream for consumption.
+     */
+    private static class VersionedStoreContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private VersionedKeyValueStore<Integer, String> store;
+
+        // in-memory copy of seen data, to validate for testing purposes.
+        // maps from key -> timestamp -> value
+        private final Map<Integer, Map<Long, String>> data = new HashMap<>();
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            // add record to store. special value "delete" is interpreted as a 
delete() call,
+            // in contrast to null value, which is a tombstone inserted via 
put()
+            if ("delete".equals(record.value())) {
+                store.delete(record.key(), record.timestamp());
+                addToSeenData(record.key(), record.timestamp(), null);
+            } else {
+                store.put(record.key(), record.value(), record.timestamp());
+                addToSeenData(record.key(), record.timestamp(), 
record.value());
+            }
+
+            // check expected contents of store, and signal completion by 
writing
+            // number of failures to downstream
+            final int failedChecks = checkStoreContents();
+            context.forward(record.withValue(failedChecks));
+        }
+
+        private void addToSeenData(final Integer key, final long timestamp, 
final String value) {
+            if (data.containsKey(key)) {
+                data.get(key).put(timestamp, value);
+            } else {
+                final Map<Long, String> timestampsAndValues = new HashMap<>();
+                timestampsAndValues.put(timestamp, value);
+                data.put(key, timestampsAndValues);
+            }
+        }
+
+        /**
+         * @return number of failed checks
+         */
+        private int checkStoreContents() {
+            int failedChecks = 0;
+            for (final Map.Entry<Integer, Map<Long, String>> 
keyWithTimestampsAndValues : data.entrySet()) {
+                final Integer key = keyWithTimestampsAndValues.getKey();
+                final Map<Long, String> timestampsAndValues = 
keyWithTimestampsAndValues.getValue();
+
+                // track largest timestamp seen for key
+                long maxTimestamp = -1L;
+                String expectedValueForMaxTimestamp = null;
+
+                for (final Map.Entry<Long, String> timestampAndValue : 
timestampsAndValues.entrySet()) {
+                    final Long timestamp = timestampAndValue.getKey();
+                    final String expectedValue = timestampAndValue.getValue();
+
+                    if (timestamp > maxTimestamp) {
+                        maxTimestamp = timestamp;
+                        expectedValueForMaxTimestamp = expectedValue;
+                    }
+
+                    // validate timestamped get on store
+                    final VersionedRecord<String> versionedRecord = 
store.get(key, timestamp);
+                    if (!contentsMatch(versionedRecord, expectedValue, 
timestamp)) {
+                        failedChecks++;
+                    }
+                }
+
+                // validate get latest on store
+                final VersionedRecord<String> versionedRecord = store.get(key);
+                if (!contentsMatch(versionedRecord, 
expectedValueForMaxTimestamp, maxTimestamp)) {
+                    failedChecks++;
+                }
+            }
+            return failedChecks;
+        }
+
+        private static boolean contentsMatch(final VersionedRecord<String> 
versionedRecord,
+                                      final String expectedValue,
+                                      final long expectedTimestamp) {
+            if (expectedValue == null) {
+                return versionedRecord == null;

Review Comment:
   Nope, the versioned store contract is that if the return value is null, then 
the entire `VersionedRecord` is null (and no timestamp is provided). In fact, 
the `VersionedRecord` class does not allow creating an instance with null value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to