vcrfxia commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1127290672
########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + + private static final String STORE_NAME = "versioned-store"; + private static final long HISTORY_RETENTION = 3600_000L; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Before + public void beforeTest() throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testName); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @After + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutGetAndDelete() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int numRecordsProduced = 0; + numRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + numRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + numRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(0, equalTo(receivedRecord.value)); + } + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce record (and wait for result) to create changelog + produceSourceData(baseTimestamp, KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertThat(changelogTopicConfig.getProperty("cleanup.policy"), equalTo("compact")); + assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); + } + + @Test + public void shouldRestore() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int initialRecordsProduced = 0; + initialRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + initialRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + initialRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + initialRecordsProduced += produceSourceData(baseTimestamp + 10, KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // new data so latest is not tombstone + + // wait for output + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records + final int additionalRecordsProduced = produceSourceData(baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + for (int i = 1; i <= additionalRecordsProduced; i++) { + final KeyValue<Integer, Integer> receivedRecord = receivedRecords.get(receivedRecords.size() - i); + // verify more than one record version found, which confirms that restore took place + assertThat(1, lessThan(receivedRecord.value)); + } + } + + @Test + public void shouldAllowCustomIQv2ForCustomStoreImplementations() { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new CustomIQv2VersionedStoreSupplier(), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // issue IQv2 query and verify result + final StateQueryRequest<String> request = + StateQueryRequest.inStore(STORE_NAME) + .withQuery(new TestQuery()) + .withPartitions(Collections.singleton(0)); + final StateQueryResult<String> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); + } + + private Properties props() { + final Properties streamsConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + /** + * @return number of records produced + */ + @SuppressWarnings("varargs") + @SafeVarargs + private final int produceSourceData(final long timestamp, + final KeyValue<Integer, String>... keyValues) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputStream, + Arrays.asList(keyValues), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + timestamp); + return keyValues.length; + } + + /** + * Test-only processor for inserting records into a versioned store while also tracking + * them separately in-memory, and performing checks to validate expected store contents. + * Forwards the number of failed checks downstream for consumption. + */ + private static class VersionedStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> { + + private ProcessorContext<Integer, Integer> context; + private VersionedKeyValueStore<Integer, String> store; + + // in-memory copy of seen data, to validate for testing purposes. + // maps from key -> timestamp -> value + private final Map<Integer, Map<Long, String>> data = new HashMap<>(); + + @Override + public void init(final ProcessorContext<Integer, Integer> context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record<Integer, String> record) { + // add record to store. special value "delete" is interpreted as a delete() call, + // in contrast to null value, which is a tombstone inserted via put() + if ("delete".equals(record.value())) { + store.delete(record.key(), record.timestamp()); + addToSeenData(record.key(), record.timestamp(), null); + } else { + store.put(record.key(), record.value(), record.timestamp()); + addToSeenData(record.key(), record.timestamp(), record.value()); + } + + // check expected contents of store, and signal completion by writing + // number of failures to downstream + final int failedChecks = checkStoreContents(); + context.forward(record.withValue(failedChecks)); + } + + private void addToSeenData(final Integer key, final long timestamp, final String value) { + if (data.containsKey(key)) { + data.get(key).put(timestamp, value); + } else { + final Map<Long, String> timestampsAndValues = new HashMap<>(); + timestampsAndValues.put(timestamp, value); + data.put(key, timestampsAndValues); + } + } + + /** + * @return number of failed checks + */ + private int checkStoreContents() { + int failedChecks = 0; + for (final Map.Entry<Integer, Map<Long, String>> keyWithTimestampsAndValues : data.entrySet()) { + final Integer key = keyWithTimestampsAndValues.getKey(); + final Map<Long, String> timestampsAndValues = keyWithTimestampsAndValues.getValue(); + + // track largest timestamp seen for key + long maxTimestamp = -1L; + String expectedValueForMaxTimestamp = null; + + for (final Map.Entry<Long, String> timestampAndValue : timestampsAndValues.entrySet()) { + final Long timestamp = timestampAndValue.getKey(); Review Comment: Seems we could go either way on this one because in the context of `contentsMatch(...)` this is `expectedTimestamp`, but in the context of `store.get(...)` it's just `timestamp`? Went ahead and made the update 👍 ########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + + private static final String STORE_NAME = "versioned-store"; + private static final long HISTORY_RETENTION = 3600_000L; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Before + public void beforeTest() throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testName); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @After + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutGetAndDelete() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int numRecordsProduced = 0; + numRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + numRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + numRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(0, equalTo(receivedRecord.value)); + } + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce record (and wait for result) to create changelog + produceSourceData(baseTimestamp, KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertThat(changelogTopicConfig.getProperty("cleanup.policy"), equalTo("compact")); + assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); + } + + @Test + public void shouldRestore() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int initialRecordsProduced = 0; + initialRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + initialRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + initialRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + initialRecordsProduced += produceSourceData(baseTimestamp + 10, KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // new data so latest is not tombstone + + // wait for output + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records + final int additionalRecordsProduced = produceSourceData(baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + for (int i = 1; i <= additionalRecordsProduced; i++) { + final KeyValue<Integer, Integer> receivedRecord = receivedRecords.get(receivedRecords.size() - i); + // verify more than one record version found, which confirms that restore took place + assertThat(1, lessThan(receivedRecord.value)); Review Comment: I considered that when writing this test but the code for doing so would be rather complex -- we'd have to track all the added records, similar to what's done in `VersionedStoreContentCheckerProcessor`, except we can't do it in the processor anymore because the processor is recreated after the restart. So we'd have to track this data separately (in the body of the test itself) and then pass it back into the processor after the restart in order to have the processor perform the checks. If you think it's worth the additional complexity, I can make the update. (As an aside, the reason these tests are already as complicated as they are is because versioned stores don't support interactive queries right now. Otherwise we could just query the store directly from outside the processor and not need to rely on these roundabout checks.) ########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + + private static final String STORE_NAME = "versioned-store"; + private static final long HISTORY_RETENTION = 3600_000L; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Before + public void beforeTest() throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testName); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @After + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutGetAndDelete() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int numRecordsProduced = 0; + numRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + numRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + numRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(0, equalTo(receivedRecord.value)); + } + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce record (and wait for result) to create changelog + produceSourceData(baseTimestamp, KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertThat(changelogTopicConfig.getProperty("cleanup.policy"), equalTo("compact")); + assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); + } + + @Test + public void shouldRestore() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int initialRecordsProduced = 0; + initialRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + initialRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + initialRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + initialRecordsProduced += produceSourceData(baseTimestamp + 10, KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // new data so latest is not tombstone + + // wait for output + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records + final int additionalRecordsProduced = produceSourceData(baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + for (int i = 1; i <= additionalRecordsProduced; i++) { + final KeyValue<Integer, Integer> receivedRecord = receivedRecords.get(receivedRecords.size() - i); + // verify more than one record version found, which confirms that restore took place + assertThat(1, lessThan(receivedRecord.value)); + } + } + + @Test + public void shouldAllowCustomIQv2ForCustomStoreImplementations() { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new CustomIQv2VersionedStoreSupplier(), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // issue IQv2 query and verify result + final StateQueryRequest<String> request = + StateQueryRequest.inStore(STORE_NAME) + .withQuery(new TestQuery()) + .withPartitions(Collections.singleton(0)); + final StateQueryResult<String> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); + } + + private Properties props() { + final Properties streamsConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + /** + * @return number of records produced + */ + @SuppressWarnings("varargs") + @SafeVarargs + private final int produceSourceData(final long timestamp, + final KeyValue<Integer, String>... keyValues) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputStream, + Arrays.asList(keyValues), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + timestamp); + return keyValues.length; + } + + /** + * Test-only processor for inserting records into a versioned store while also tracking + * them separately in-memory, and performing checks to validate expected store contents. + * Forwards the number of failed checks downstream for consumption. + */ + private static class VersionedStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> { + + private ProcessorContext<Integer, Integer> context; + private VersionedKeyValueStore<Integer, String> store; + + // in-memory copy of seen data, to validate for testing purposes. + // maps from key -> timestamp -> value + private final Map<Integer, Map<Long, String>> data = new HashMap<>(); + + @Override + public void init(final ProcessorContext<Integer, Integer> context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record<Integer, String> record) { + // add record to store. special value "delete" is interpreted as a delete() call, + // in contrast to null value, which is a tombstone inserted via put() + if ("delete".equals(record.value())) { + store.delete(record.key(), record.timestamp()); + addToSeenData(record.key(), record.timestamp(), null); + } else { + store.put(record.key(), record.value(), record.timestamp()); + addToSeenData(record.key(), record.timestamp(), record.value()); + } + + // check expected contents of store, and signal completion by writing + // number of failures to downstream + final int failedChecks = checkStoreContents(); + context.forward(record.withValue(failedChecks)); + } + + private void addToSeenData(final Integer key, final long timestamp, final String value) { + if (data.containsKey(key)) { + data.get(key).put(timestamp, value); + } else { + final Map<Long, String> timestampsAndValues = new HashMap<>(); + timestampsAndValues.put(timestamp, value); + data.put(key, timestampsAndValues); + } + } + + /** + * @return number of failed checks + */ + private int checkStoreContents() { + int failedChecks = 0; + for (final Map.Entry<Integer, Map<Long, String>> keyWithTimestampsAndValues : data.entrySet()) { + final Integer key = keyWithTimestampsAndValues.getKey(); + final Map<Long, String> timestampsAndValues = keyWithTimestampsAndValues.getValue(); + + // track largest timestamp seen for key + long maxTimestamp = -1L; + String expectedValueForMaxTimestamp = null; + + for (final Map.Entry<Long, String> timestampAndValue : timestampsAndValues.entrySet()) { + final Long timestamp = timestampAndValue.getKey(); + final String expectedValue = timestampAndValue.getValue(); + + if (timestamp > maxTimestamp) { + maxTimestamp = timestamp; + expectedValueForMaxTimestamp = expectedValue; + } + + // validate timestamped get on store + final VersionedRecord<String> versionedRecord = store.get(key, timestamp); + if (!contentsMatch(versionedRecord, expectedValue, timestamp)) { + failedChecks++; + } + } + + // validate get latest on store + final VersionedRecord<String> versionedRecord = store.get(key); + if (!contentsMatch(versionedRecord, expectedValueForMaxTimestamp, maxTimestamp)) { + failedChecks++; + } + } + return failedChecks; + } + + private static boolean contentsMatch(final VersionedRecord<String> versionedRecord, + final String expectedValue, + final long expectedTimestamp) { + if (expectedValue == null) { + return versionedRecord == null; + } else { + if (versionedRecord == null) { + return false; + } + return expectedValue.equals(versionedRecord.value()) + && expectedTimestamp == versionedRecord.timestamp(); + } + } + } + + /** + * Test-only processor for counting the number of record versions for a specific key, + * and forwards this count downstream for consumption. The count only includes record + * versions earlier than the current one, and stops as soon as a null is encountered. + */ + private static class VersionedStoreCountProcessor implements Processor<Integer, String, Integer, Integer> { + + private ProcessorContext<Integer, Integer> context; + private VersionedKeyValueStore<Integer, String> store; + + @Override + public void init(final ProcessorContext<Integer, Integer> context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record<Integer, String> record) { + // add record to store. special value "delete" is interpreted as a delete() call, + // in contrast to null value, which is a tombstone inserted via put() + if ("delete".equals(record.value())) { + store.delete(record.key(), record.timestamp()); + } else { + store.put(record.key(), record.value(), record.timestamp()); + } + + // count number of versions for this key, up through the current version. + // count stops as soon as a null is reached + int numVersions = 0; + long timestamp = record.timestamp(); + while (true) { + final VersionedRecord<String> versionedRecord = store.get(record.key(), timestamp); + if (versionedRecord != null) { + numVersions++; + // skip forward from current record version to search for more Review Comment: Yes... in my mind that was "forward" (i.e., earlier) but I see now why that's super confusing 😆 Rewrote this comment to say "earlier" instead of either "forward" or "backward." ########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + + private static final String STORE_NAME = "versioned-store"; + private static final long HISTORY_RETENTION = 3600_000L; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Before + public void beforeTest() throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testName); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @After + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutGetAndDelete() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int numRecordsProduced = 0; + numRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + numRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + numRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(0, equalTo(receivedRecord.value)); + } + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce record (and wait for result) to create changelog + produceSourceData(baseTimestamp, KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertThat(changelogTopicConfig.getProperty("cleanup.policy"), equalTo("compact")); + assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); + } + + @Test + public void shouldRestore() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int initialRecordsProduced = 0; + initialRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + initialRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + initialRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + initialRecordsProduced += produceSourceData(baseTimestamp + 10, KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // new data so latest is not tombstone + + // wait for output + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records + final int additionalRecordsProduced = produceSourceData(baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + for (int i = 1; i <= additionalRecordsProduced; i++) { + final KeyValue<Integer, Integer> receivedRecord = receivedRecords.get(receivedRecords.size() - i); + // verify more than one record version found, which confirms that restore took place + assertThat(1, lessThan(receivedRecord.value)); + } + } + + @Test + public void shouldAllowCustomIQv2ForCustomStoreImplementations() { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new CustomIQv2VersionedStoreSupplier(), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // issue IQv2 query and verify result + final StateQueryRequest<String> request = + StateQueryRequest.inStore(STORE_NAME) + .withQuery(new TestQuery()) + .withPartitions(Collections.singleton(0)); + final StateQueryResult<String> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); + } + + private Properties props() { + final Properties streamsConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + /** + * @return number of records produced + */ + @SuppressWarnings("varargs") + @SafeVarargs + private final int produceSourceData(final long timestamp, + final KeyValue<Integer, String>... keyValues) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputStream, + Arrays.asList(keyValues), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + timestamp); + return keyValues.length; + } + + /** + * Test-only processor for inserting records into a versioned store while also tracking + * them separately in-memory, and performing checks to validate expected store contents. + * Forwards the number of failed checks downstream for consumption. + */ + private static class VersionedStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> { + + private ProcessorContext<Integer, Integer> context; + private VersionedKeyValueStore<Integer, String> store; + + // in-memory copy of seen data, to validate for testing purposes. + // maps from key -> timestamp -> value + private final Map<Integer, Map<Long, String>> data = new HashMap<>(); + + @Override + public void init(final ProcessorContext<Integer, Integer> context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record<Integer, String> record) { + // add record to store. special value "delete" is interpreted as a delete() call, + // in contrast to null value, which is a tombstone inserted via put() + if ("delete".equals(record.value())) { + store.delete(record.key(), record.timestamp()); + addToSeenData(record.key(), record.timestamp(), null); + } else { + store.put(record.key(), record.value(), record.timestamp()); + addToSeenData(record.key(), record.timestamp(), record.value()); + } + + // check expected contents of store, and signal completion by writing + // number of failures to downstream + final int failedChecks = checkStoreContents(); + context.forward(record.withValue(failedChecks)); + } + + private void addToSeenData(final Integer key, final long timestamp, final String value) { + if (data.containsKey(key)) { + data.get(key).put(timestamp, value); + } else { + final Map<Long, String> timestampsAndValues = new HashMap<>(); + timestampsAndValues.put(timestamp, value); + data.put(key, timestampsAndValues); + } + } + + /** + * @return number of failed checks + */ + private int checkStoreContents() { + int failedChecks = 0; + for (final Map.Entry<Integer, Map<Long, String>> keyWithTimestampsAndValues : data.entrySet()) { + final Integer key = keyWithTimestampsAndValues.getKey(); + final Map<Long, String> timestampsAndValues = keyWithTimestampsAndValues.getValue(); + + // track largest timestamp seen for key + long maxTimestamp = -1L; + String expectedValueForMaxTimestamp = null; + + for (final Map.Entry<Long, String> timestampAndValue : timestampsAndValues.entrySet()) { + final Long timestamp = timestampAndValue.getKey(); + final String expectedValue = timestampAndValue.getValue(); + + if (timestamp > maxTimestamp) { + maxTimestamp = timestamp; + expectedValueForMaxTimestamp = expectedValue; + } + + // validate timestamped get on store + final VersionedRecord<String> versionedRecord = store.get(key, timestamp); + if (!contentsMatch(versionedRecord, expectedValue, timestamp)) { + failedChecks++; + } + } + + // validate get latest on store + final VersionedRecord<String> versionedRecord = store.get(key); + if (!contentsMatch(versionedRecord, expectedValueForMaxTimestamp, maxTimestamp)) { + failedChecks++; + } + } + return failedChecks; + } + + private static boolean contentsMatch(final VersionedRecord<String> versionedRecord, + final String expectedValue, + final long expectedTimestamp) { + if (expectedValue == null) { + return versionedRecord == null; + } else { + if (versionedRecord == null) { + return false; + } + return expectedValue.equals(versionedRecord.value()) + && expectedTimestamp == versionedRecord.timestamp(); + } + } + } + + /** + * Test-only processor for counting the number of record versions for a specific key, + * and forwards this count downstream for consumption. The count only includes record + * versions earlier than the current one, and stops as soon as a null is encountered. + */ + private static class VersionedStoreCountProcessor implements Processor<Integer, String, Integer, Integer> { + + private ProcessorContext<Integer, Integer> context; + private VersionedKeyValueStore<Integer, String> store; + + @Override + public void init(final ProcessorContext<Integer, Integer> context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record<Integer, String> record) { + // add record to store. special value "delete" is interpreted as a delete() call, + // in contrast to null value, which is a tombstone inserted via put() + if ("delete".equals(record.value())) { + store.delete(record.key(), record.timestamp()); + } else { + store.put(record.key(), record.value(), record.timestamp()); + } + + // count number of versions for this key, up through the current version. + // count stops as soon as a null is reached + int numVersions = 0; + long timestamp = record.timestamp(); + while (true) { + final VersionedRecord<String> versionedRecord = store.get(record.key(), timestamp); Review Comment: We could do that too. Either way it's not possible to get an accurate count of the total number of record versions for the key, since the search process stops as soon as a null is encountered (because nulls do not come with timestamps, which means we cannot continue to search for earlier records). So, this test processor isn't trying to do anything complete/comprehensive -- just enough to give us the checks that we need. Given that `put/get/delete` is already tested more comprehensively in `shouldPutGetAndDelete()` and that this processor is only used for other tests, I'm inclined to leave it as is. ########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + + private static final String STORE_NAME = "versioned-store"; + private static final long HISTORY_RETENTION = 3600_000L; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Before + public void beforeTest() throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testName); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @After + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutGetAndDelete() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int numRecordsProduced = 0; + numRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + numRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + numRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(0, equalTo(receivedRecord.value)); + } + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce record (and wait for result) to create changelog + produceSourceData(baseTimestamp, KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertThat(changelogTopicConfig.getProperty("cleanup.policy"), equalTo("compact")); + assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); + } + + @Test + public void shouldRestore() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int initialRecordsProduced = 0; + initialRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + initialRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + initialRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + initialRecordsProduced += produceSourceData(baseTimestamp + 10, KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // new data so latest is not tombstone + + // wait for output + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records + final int additionalRecordsProduced = produceSourceData(baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + for (int i = 1; i <= additionalRecordsProduced; i++) { + final KeyValue<Integer, Integer> receivedRecord = receivedRecords.get(receivedRecords.size() - i); + // verify more than one record version found, which confirms that restore took place + assertThat(1, lessThan(receivedRecord.value)); + } + } + + @Test + public void shouldAllowCustomIQv2ForCustomStoreImplementations() { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new CustomIQv2VersionedStoreSupplier(), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // issue IQv2 query and verify result + final StateQueryRequest<String> request = + StateQueryRequest.inStore(STORE_NAME) + .withQuery(new TestQuery()) + .withPartitions(Collections.singleton(0)); + final StateQueryResult<String> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); Review Comment: > Wondering if we could use a custom query type to verify the store content in the restore test, directly after restoration? The restore test tests the actual RocksDB-based versioned store implementation, which does not support IQ. (Aside: that's also why I had to introduce a new versioned store type for this restore test -- once we support IQ in the RocksDB-based implementation itself, we can update this test to remove the extra code.) > Seems we only test if we don't crash? Also that we get the expected string back from the custom query type, but yes agreed that it's not a particularly exciting check. Another reason to update this test once the RocksDB-based implementation supports IQ. ########## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + + private static final String STORE_NAME = "versioned-store"; + private static final long HISTORY_RETENTION = 3600_000L; + + private String inputStream; + private String outputStream; + private long baseTimestamp; + + private KafkaStreams kafkaStreams; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void before() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void after() { + CLUSTER.stop(); + } + + @Before + public void beforeTest() throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testName); + inputStream = "input-stream-" + uniqueTestName; + outputStream = "output-stream-" + uniqueTestName; + CLUSTER.createTopic(inputStream); + CLUSTER.createTopic(outputStream); + + baseTimestamp = CLUSTER.time.milliseconds(); + } + + @After + public void afterTest() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldPutGetAndDelete() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int numRecordsProduced = 0; + numRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + numRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + numRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + numRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + numRecordsProduced); + + for (final KeyValue<Integer, Integer> receivedRecord : receivedRecords) { + // verify zero failed checks for each record + assertThat(0, equalTo(receivedRecord.value)); + } + } + + @Test + public void shouldSetChangelogTopicProperties() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce record (and wait for result) to create changelog + produceSourceData(baseTimestamp, KeyValue.pair(0, "foo")); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + 1); + + // verify changelog topic properties + final String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog"; + final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); + assertThat(changelogTopicConfig.getProperty("cleanup.policy"), equalTo("compact")); + assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); + } + + @Test + public void shouldRestore() throws Exception { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce source data + int initialRecordsProduced = 0; + initialRecordsProduced += produceSourceData(baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); + initialRecordsProduced += produceSourceData(baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + initialRecordsProduced += produceSourceData(baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // replace existing records + initialRecordsProduced += produceSourceData(baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")); // delete + initialRecordsProduced += produceSourceData(baseTimestamp + 10, KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // new data so latest is not tombstone + + // wait for output + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced); + + // wipe out state store to trigger restore process on restart + kafkaStreams.close(); + kafkaStreams.cleanUp(); + + // restart app + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // produce additional records + final int additionalRecordsProduced = produceSourceData(baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12")); + + // wait for output and verify + final List<KeyValue<Integer, Integer>> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class), + outputStream, + initialRecordsProduced + additionalRecordsProduced); + + for (int i = 1; i <= additionalRecordsProduced; i++) { + final KeyValue<Integer, Integer> receivedRecord = receivedRecords.get(receivedRecords.size() - i); + // verify more than one record version found, which confirms that restore took place + assertThat(1, lessThan(receivedRecord.value)); + } + } + + @Test + public void shouldAllowCustomIQv2ForCustomStoreImplementations() { + // build topology and start app + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder + .addStateStore( + new VersionedKeyValueStoreBuilder<>( + new CustomIQv2VersionedStoreSupplier(), + Serdes.Integer(), + Serdes.String(), + Time.SYSTEM + ) + ) + .stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) + .process(VersionedStoreCountProcessor::new, STORE_NAME) + .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); + kafkaStreams.start(); + + // issue IQv2 query and verify result + final StateQueryRequest<String> request = + StateQueryRequest.inStore(STORE_NAME) + .withQuery(new TestQuery()) + .withPartitions(Collections.singleton(0)); + final StateQueryResult<String> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); + } + + private Properties props() { + final Properties streamsConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + /** + * @return number of records produced + */ + @SuppressWarnings("varargs") + @SafeVarargs + private final int produceSourceData(final long timestamp, + final KeyValue<Integer, String>... keyValues) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputStream, + Arrays.asList(keyValues), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + timestamp); + return keyValues.length; + } + + /** + * Test-only processor for inserting records into a versioned store while also tracking + * them separately in-memory, and performing checks to validate expected store contents. + * Forwards the number of failed checks downstream for consumption. + */ + private static class VersionedStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> { + + private ProcessorContext<Integer, Integer> context; + private VersionedKeyValueStore<Integer, String> store; + + // in-memory copy of seen data, to validate for testing purposes. + // maps from key -> timestamp -> value + private final Map<Integer, Map<Long, String>> data = new HashMap<>(); + + @Override + public void init(final ProcessorContext<Integer, Integer> context) { + this.context = context; + store = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record<Integer, String> record) { + // add record to store. special value "delete" is interpreted as a delete() call, + // in contrast to null value, which is a tombstone inserted via put() + if ("delete".equals(record.value())) { + store.delete(record.key(), record.timestamp()); + addToSeenData(record.key(), record.timestamp(), null); + } else { + store.put(record.key(), record.value(), record.timestamp()); + addToSeenData(record.key(), record.timestamp(), record.value()); + } + + // check expected contents of store, and signal completion by writing + // number of failures to downstream + final int failedChecks = checkStoreContents(); + context.forward(record.withValue(failedChecks)); + } + + private void addToSeenData(final Integer key, final long timestamp, final String value) { + if (data.containsKey(key)) { + data.get(key).put(timestamp, value); + } else { + final Map<Long, String> timestampsAndValues = new HashMap<>(); + timestampsAndValues.put(timestamp, value); + data.put(key, timestampsAndValues); + } + } + + /** + * @return number of failed checks + */ + private int checkStoreContents() { + int failedChecks = 0; + for (final Map.Entry<Integer, Map<Long, String>> keyWithTimestampsAndValues : data.entrySet()) { + final Integer key = keyWithTimestampsAndValues.getKey(); + final Map<Long, String> timestampsAndValues = keyWithTimestampsAndValues.getValue(); + + // track largest timestamp seen for key + long maxTimestamp = -1L; + String expectedValueForMaxTimestamp = null; + + for (final Map.Entry<Long, String> timestampAndValue : timestampsAndValues.entrySet()) { + final Long timestamp = timestampAndValue.getKey(); + final String expectedValue = timestampAndValue.getValue(); + + if (timestamp > maxTimestamp) { + maxTimestamp = timestamp; + expectedValueForMaxTimestamp = expectedValue; + } + + // validate timestamped get on store + final VersionedRecord<String> versionedRecord = store.get(key, timestamp); + if (!contentsMatch(versionedRecord, expectedValue, timestamp)) { + failedChecks++; + } + } + + // validate get latest on store + final VersionedRecord<String> versionedRecord = store.get(key); + if (!contentsMatch(versionedRecord, expectedValueForMaxTimestamp, maxTimestamp)) { + failedChecks++; + } + } + return failedChecks; + } + + private static boolean contentsMatch(final VersionedRecord<String> versionedRecord, + final String expectedValue, + final long expectedTimestamp) { + if (expectedValue == null) { + return versionedRecord == null; Review Comment: Nope, the versioned store contract is that if the return value is null, then the entire `VersionedRecord` is null (and no timestamp is provided). In fact, the `VersionedRecord` class does not allow creating an instance with null value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org