vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1128682440


##########
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+    private static final String STORE_NAME = "versioned-store";
+    private static final long HISTORY_RETENTION = 3600_000L;
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @BeforeClass
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void beforeTest() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+    }
+
+    @After
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int numRecordsProduced = 0;
+        numRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        numRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        numRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        numRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertThat(0, equalTo(receivedRecord.value));
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceSourceData(baseTimestamp, KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
"app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertThat(changelogTopicConfig.getProperty("cleanup.policy"), 
equalTo("compact"));
+        assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), 
equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L)));
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        // build topology and start app
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                new VersionedKeyValueStoreBuilder<>(
+                    new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
HISTORY_RETENTION),
+                    Serdes.Integer(),
+                    Serdes.String(),
+                    Time.SYSTEM
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(VersionedStoreCountProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceSourceData(baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+        initialRecordsProduced += produceSourceData(baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+        initialRecordsProduced += produceSourceData(baseTimestamp + 5, 
KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, null)); // 
replace existing records
+        initialRecordsProduced += produceSourceData(baseTimestamp + 7, 
KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, 
"delete")); // delete
+        initialRecordsProduced += produceSourceData(baseTimestamp + 10, 
KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10")); // 
new data so latest is not tombstone
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records
+        final int additionalRecordsProduced = produceSourceData(baseTimestamp 
+ 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (int i = 1; i <= additionalRecordsProduced; i++) {
+            final KeyValue<Integer, Integer> receivedRecord = 
receivedRecords.get(receivedRecords.size() - i);
+            // verify more than one record version found, which confirms that 
restore took place
+            assertThat(1, lessThan(receivedRecord.value));

Review Comment:
   Oh wow, this additional test coverage (new global store test + enhanced 
checks on restore) actually caught a bug... I'm embarrassed. Anyhow, it's fixed 
now and the additional code complexity for enhanced test coverage is evidently 
worthwhile, so let's keep it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to