This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 16b4088 KAFAK-3522: Add TopologyTestDriver unit tests (#6179) 16b4088 is described below commit 16b408898e75b00ddf6b607246833cdbcd56f507 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Thu May 16 16:16:00 2019 +0100 KAFAK-3522: Add TopologyTestDriver unit tests (#6179) Reviewers: John Roesler <j...@confluent.io>, Bill Bejeck <b...@confluent.io> --- .../org/apache/kafka/streams/state/Stores.java | 41 +- .../integration/StoreUpgradeIntegrationTest.java | 995 +++++++++++++++++++++ .../kafka/streams/TopologyTestDriverTest.java | 196 +++- 3 files changed, 1188 insertions(+), 44 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index e40251d..2f81fd3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -354,26 +354,6 @@ public final class Stores { } /** - * Create an in-memory {@link SessionBytesStoreSupplier}. - * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length ot time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the - * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. - * @return an instance of a {@link SessionBytesStoreSupplier} - */ - public static SessionBytesStoreSupplier inMemorySessionStore(final String name, final Duration retentionPeriod) { - Objects.requireNonNull(name, "name cannot be null"); - - final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); - final long retentionPeriodMs = ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix); - if (retentionPeriodMs < 0) { - throw new IllegalArgumentException("retentionPeriod cannot be negative"); - } - return new InMemorySessionBytesStoreSupplier(name, retentionPeriodMs); - } - - /** * Create a persistent {@link SessionBytesStoreSupplier}. * * @param name name of the store (cannot be {@code null}) @@ -412,6 +392,27 @@ public final class Stores { } /** + * Create an in-memory {@link SessionBytesStoreSupplier}. + * + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length ot time to retain data in the store (cannot be negative) + * Note that the retention period must be at least long enough to contain the + * windowed data's entire life cycle, from window-start through window-end, + * and for the entire grace period. + * @return an instance of a {@link SessionBytesStoreSupplier} + */ + public static SessionBytesStoreSupplier inMemorySessionStore(final String name, final Duration retentionPeriod) { + Objects.requireNonNull(name, "name cannot be null"); + + final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); + final long retentionPeriodMs = ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix); + if (retentionPeriodMs < 0) { + throw new IllegalArgumentException("retentionPeriod cannot be negative"); + } + return new InMemorySessionBytesStoreSupplier(name, retentionPeriodMs); + } + + /** * Creates a {@link StoreBuilder} that can be used to build a {@link KeyValueStore}. * <p> * The provided supplier should <strong>not</strong> be a supplier for diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java new file mode 100644 index 0000000..ba5b08f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -0,0 +1,995 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +@Category({IntegrationTest.class}) +public class StoreUpgradeIntegrationTest { + private static String inputStream; + private static final String STORE_NAME = "store"; + + private KafkaStreams kafkaStreams; + private static int testCounter = 0; + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Before + public void createTopics() throws Exception { + inputStream = "input-stream-" + testCounter; + CLUSTER.createTopic(inputStream); + } + + private Properties props() { + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "addId-" + testCounter++); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + @After + public void shutdown() { + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(30L)); + kafkaStreams.cleanUp(); + } + } + + @Test + public void shouldMigrateInMemoryKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception { + shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(false); + } + + @Test + public void shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception { + shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(true); + } + + private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final boolean persistentStore) throws Exception { + final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); + + streamsBuilderForOldStore.addStateStore( + Stores.keyValueStoreBuilder( + persistentStore ? Stores.persistentKeyValueStore(STORE_NAME) : Stores.inMemoryKeyValueStore(STORE_NAME), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(KeyValueProcessor::new, STORE_NAME); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props); + kafkaStreams.start(); + + processKeyValueAndVerifyPlainCount(1, singletonList(KeyValue.pair(1, 1L))); + + processKeyValueAndVerifyPlainCount(1, singletonList(KeyValue.pair(1, 2L))); + final long lastUpdateKeyOne = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L; + + processKeyValueAndVerifyPlainCount(2, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L))); + final long lastUpdateKeyTwo = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L; + + processKeyValueAndVerifyPlainCount(3, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L), + KeyValue.pair(3, 1L))); + final long lastUpdateKeyThree = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L; + + processKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L), + KeyValue.pair(3, 1L), + KeyValue.pair(4, 1L))); + + processKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L), + KeyValue.pair(3, 1L), + KeyValue.pair(4, 2L))); + + processKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L), + KeyValue.pair(3, 1L), + KeyValue.pair(4, 3L))); + final long lastUpdateKeyFour = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L; + + kafkaStreams.close(); + kafkaStreams = null; + + + + final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); + + streamsBuilderForNewStore.addStateStore( + Stores.timestampedKeyValueStoreBuilder( + persistentStore ? Stores.persistentTimestampedKeyValueStore(STORE_NAME) : Stores.inMemoryKeyValueStore(STORE_NAME), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(TimestampedKeyValueProcessor::new, STORE_NAME); + + kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props); + kafkaStreams.start(); + + verifyCountWithTimestamp(1, 2L, lastUpdateKeyOne); + verifyCountWithTimestamp(2, 1L, lastUpdateKeyTwo); + verifyCountWithTimestamp(3, 1L, lastUpdateKeyThree); + verifyCountWithTimestamp(4, 3L, lastUpdateKeyFour); + + final long currentTime = CLUSTER.time.milliseconds(); + processKeyValueAndVerifyCountWithTimestamp(1, currentTime + 42L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair(2, ValueAndTimestamp.make(1L, lastUpdateKeyTwo)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair(4, ValueAndTimestamp.make(3L, lastUpdateKeyFour)))); + + processKeyValueAndVerifyCountWithTimestamp(2, currentTime + 45L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair(2, ValueAndTimestamp.make(2L, currentTime + 45L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair(4, ValueAndTimestamp.make(3L, lastUpdateKeyFour)))); + + // can process "out of order" record for different key + processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 21L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair(2, ValueAndTimestamp.make(2L, currentTime + 45L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair(4, ValueAndTimestamp.make(4L, currentTime + 21L)))); + + processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 42L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair(2, ValueAndTimestamp.make(2L, currentTime + 45L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair(4, ValueAndTimestamp.make(5L, currentTime + 42L)))); + + // out of order (same key) record should not reduce result timestamp + processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 10L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair(2, ValueAndTimestamp.make(2L, currentTime + 45L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair(4, ValueAndTimestamp.make(6L, currentTime + 42L)))); + + kafkaStreams.close(); + } + + @Test + public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception { + final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); + + streamsBuilderForOldStore.addStateStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(STORE_NAME), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(KeyValueProcessor::new, STORE_NAME); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props); + kafkaStreams.start(); + + processKeyValueAndVerifyPlainCount(1, singletonList(KeyValue.pair(1, 1L))); + + processKeyValueAndVerifyPlainCount(1, singletonList(KeyValue.pair(1, 2L))); + + processKeyValueAndVerifyPlainCount(2, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L))); + + processKeyValueAndVerifyPlainCount(3, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L), + KeyValue.pair(3, 1L))); + + processKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L), + KeyValue.pair(3, 1L), + KeyValue.pair(4, 1L))); + + processKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L), + KeyValue.pair(3, 1L), + KeyValue.pair(4, 2L))); + + processKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(1, 2L), + KeyValue.pair(2, 1L), + KeyValue.pair(3, 1L), + KeyValue.pair(4, 3L))); + + kafkaStreams.close(); + kafkaStreams = null; + + + + final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); + + streamsBuilderForNewStore.addStateStore( + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentKeyValueStore(STORE_NAME), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(TimestampedKeyValueProcessor::new, STORE_NAME); + + kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props); + kafkaStreams.start(); + + verifyCountWithSurrogateTimestamp(1, 2L); + verifyCountWithSurrogateTimestamp(2, 1L); + verifyCountWithSurrogateTimestamp(3, 1L); + verifyCountWithSurrogateTimestamp(4, 3L); + + processKeyValueAndVerifyCount(1, 42L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(2, ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(4, ValueAndTimestamp.make(3L, -1L)))); + + processKeyValueAndVerifyCount(2, 45L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(4, ValueAndTimestamp.make(3L, -1L)))); + + // can process "out of order" record for different key + processKeyValueAndVerifyCount(4, 21L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(4, ValueAndTimestamp.make(4L, -1L)))); + + processKeyValueAndVerifyCount(4, 42L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(4, ValueAndTimestamp.make(5L, -1L)))); + + // out of order (same key) record should not reduce result timestamp + processKeyValueAndVerifyCount(4, 10L, asList( + KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)), + KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(4, ValueAndTimestamp.make(6L, -1L)))); + + kafkaStreams.close(); + } + + private <K, V> void processKeyValueAndVerifyPlainCount(final K key, + final List<KeyValue<Integer, Object>> expectedStoreContent) + throws Exception { + + IntegrationTestUtils.produceKeyValuesSynchronously( + inputStream, + singletonList(KeyValue.pair(key, 0)), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class), + CLUSTER.time); + + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyKeyValueStore<K, V> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.keyValueStore()); + try (final KeyValueIterator<K, V> all = store.all()) { + final List<KeyValue<K, V>> storeContent = new LinkedList<>(); + while (all.hasNext()) { + storeContent.add(all.next()); + } + return storeContent.equals(expectedStoreContent); + } + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + private <K> void verifyCountWithTimestamp(final K key, + final long value, + final long timestamp) throws Exception { + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()); + final ValueAndTimestamp<Long> count = store.get(key); + return count.value() == value && count.timestamp() == timestamp; + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + private <K> void verifyCountWithSurrogateTimestamp(final K key, + final long value) throws Exception { + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()); + final ValueAndTimestamp<Long> count = store.get(key); + return count.value() == value && count.timestamp() == -1L; + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + private <K, V> void processKeyValueAndVerifyCount(final K key, + final long timestamp, + final List<KeyValue<Integer, Object>> expectedStoreContent) + throws Exception { + + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputStream, + singletonList(KeyValue.pair(key, 0)), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class), + timestamp); + + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()); + try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) { + final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>(); + while (all.hasNext()) { + storeContent.add(all.next()); + } + return storeContent.equals(expectedStoreContent); + } + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + private <K, V> void processKeyValueAndVerifyCountWithTimestamp(final K key, + final long timestamp, + final List<KeyValue<Integer, Object>> expectedStoreContent) + throws Exception { + + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputStream, + singletonList(KeyValue.pair(key, 0)), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class), + timestamp); + + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore()); + try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) { + final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>(); + while (all.hasNext()) { + storeContent.add(all.next()); + } + return storeContent.equals(expectedStoreContent); + } + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + @Test + public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception { + final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); + streamsBuilderForOldStore + .addStateStore( + Stores.windowStoreBuilder( + Stores.inMemoryWindowStore( + STORE_NAME, + Duration.ofMillis(1000L), + Duration.ofMillis(1000L), + false), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(WindowedProcessor::new, STORE_NAME); + + final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); + streamsBuilderForNewStore + .addStateStore( + Stores.timestampedWindowStoreBuilder( + Stores.inMemoryWindowStore( + STORE_NAME, + Duration.ofMillis(1000L), + Duration.ofMillis(1000L), + false), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(TimestampedWindowedProcessor::new, STORE_NAME); + + + shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi( + new KafkaStreams(streamsBuilderForOldStore.build(), props()), + new KafkaStreams(streamsBuilderForNewStore.build(), props()), + false); + } + + @Test + public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception { + final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); + + streamsBuilderForOldStore + .addStateStore( + Stores.windowStoreBuilder( + Stores.persistentWindowStore( + STORE_NAME, + Duration.ofMillis(1000L), + Duration.ofMillis(1000L), + false), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(WindowedProcessor::new, STORE_NAME); + + final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); + streamsBuilderForNewStore + .addStateStore( + Stores.timestampedWindowStoreBuilder( + Stores.persistentTimestampedWindowStore( + STORE_NAME, + Duration.ofMillis(1000L), + Duration.ofMillis(1000L), + false), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(TimestampedWindowedProcessor::new, STORE_NAME); + + final Properties props = props(); + shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi( + new KafkaStreams(streamsBuilderForOldStore.build(), props), + new KafkaStreams(streamsBuilderForNewStore.build(), props), + true); + } + + private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final KafkaStreams kafkaStreamsOld, + final KafkaStreams kafkaStreamsNew, + final boolean persistentStore) throws Exception { + kafkaStreams = kafkaStreamsOld; + kafkaStreams.start(); + + processWindowedKeyValueAndVerifyPlainCount(1, singletonList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 1L))); + + processWindowedKeyValueAndVerifyPlainCount(1, singletonList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L))); + final long lastUpdateKeyOne = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L; + + processWindowedKeyValueAndVerifyPlainCount(2, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L))); + final long lastUpdateKeyTwo = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L; + + processWindowedKeyValueAndVerifyPlainCount(3, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L))); + final long lastUpdateKeyThree = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L; + + processWindowedKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 1L))); + + processWindowedKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 2L))); + + processWindowedKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 3L))); + final long lastUpdateKeyFour = persistentStore ? -1L : CLUSTER.time.milliseconds() - 1L; + + kafkaStreams.close(); + kafkaStreams = null; + + + kafkaStreams = kafkaStreamsNew; + kafkaStreams.start(); + + verifyWindowedCountWithTimestamp(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L, lastUpdateKeyOne); + verifyWindowedCountWithTimestamp(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L, lastUpdateKeyTwo); + verifyWindowedCountWithTimestamp(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L, lastUpdateKeyThree); + verifyWindowedCountWithTimestamp(new Windowed<>(4, new TimeWindow(0L, 1000L)), 3L, lastUpdateKeyFour); + + final long currentTime = CLUSTER.time.milliseconds(); + processKeyValueAndVerifyWindowedCountWithTimestamp(1, currentTime + 42L, asList( + KeyValue.pair( + new Windowed<>(1, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair( + new Windowed<>(2, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(1L, lastUpdateKeyTwo)), + KeyValue.pair( + new Windowed<>(3, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair( + new Windowed<>(4, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(3L, lastUpdateKeyFour)))); + + processKeyValueAndVerifyWindowedCountWithTimestamp(2, currentTime + 45L, asList( + KeyValue.pair( + new Windowed<>(1, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair( + new Windowed<>(2, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(2L, currentTime + 45L)), + KeyValue.pair( + new Windowed<>(3, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair( + new Windowed<>(4, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(3L, lastUpdateKeyFour)))); + + // can process "out of order" record for different key + processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 21L, asList( + KeyValue.pair( + new Windowed<>(1, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair( + new Windowed<>(2, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(2L, currentTime + 45L)), + KeyValue.pair( + new Windowed<>(3, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair( + new Windowed<>(4, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(4L, currentTime + 21L)))); + + processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 42L, asList( + KeyValue.pair( + new Windowed<>(1, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair( + new Windowed<>(2, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(2L, currentTime + 45L)), + KeyValue.pair( + new Windowed<>(3, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair( + new Windowed<>(4, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(5L, currentTime + 42L)))); + + // out of order (same key) record should not reduce result timestamp + processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 10L, asList( + KeyValue.pair( + new Windowed<>(1, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(3L, currentTime + 42L)), + KeyValue.pair( + new Windowed<>(2, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(2L, currentTime + 45L)), + KeyValue.pair( + new Windowed<>(3, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(1L, lastUpdateKeyThree)), + KeyValue.pair( + new Windowed<>(4, new TimeWindow(0L, 1000L)), + ValueAndTimestamp.make(6L, currentTime + 42L)))); + + // test new segment + processKeyValueAndVerifyWindowedCountWithTimestamp(10, currentTime + 100001L, singletonList( + KeyValue.pair( + new Windowed<>(10, new TimeWindow(100000L, 101000L)), ValueAndTimestamp.make(1L, currentTime + 100001L)))); + + + kafkaStreams.close(); + } + + @Test + public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception { + final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); + + streamsBuilderForOldStore.addStateStore( + Stores.windowStoreBuilder( + Stores.persistentWindowStore( + STORE_NAME, + Duration.ofMillis(1000L), + Duration.ofMillis(1000L), + false), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(WindowedProcessor::new, STORE_NAME); + + final Properties props = props(); + kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props); + kafkaStreams.start(); + + processWindowedKeyValueAndVerifyPlainCount(1, singletonList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 1L))); + + processWindowedKeyValueAndVerifyPlainCount(1, singletonList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L))); + + processWindowedKeyValueAndVerifyPlainCount(2, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L))); + + processWindowedKeyValueAndVerifyPlainCount(3, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L))); + + processWindowedKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 1L))); + + processWindowedKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 2L))); + + processWindowedKeyValueAndVerifyPlainCount(4, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), 3L))); + + kafkaStreams.close(); + kafkaStreams = null; + + + + final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder(); + + streamsBuilderForNewStore.addStateStore( + Stores.timestampedWindowStoreBuilder( + Stores.persistentWindowStore( + STORE_NAME, + Duration.ofMillis(1000L), + Duration.ofMillis(1000L), + false), + Serdes.Integer(), + Serdes.Long())) + .<Integer, Integer>stream(inputStream) + .process(TimestampedWindowedProcessor::new, STORE_NAME); + + kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props); + kafkaStreams.start(); + + verifyWindowedCountWithSurrogateTimestamp(new Windowed<>(1, new TimeWindow(0L, 1000L)), 2L); + verifyWindowedCountWithSurrogateTimestamp(new Windowed<>(2, new TimeWindow(0L, 1000L)), 1L); + verifyWindowedCountWithSurrogateTimestamp(new Windowed<>(3, new TimeWindow(0L, 1000L)), 1L); + verifyWindowedCountWithSurrogateTimestamp(new Windowed<>(4, new TimeWindow(0L, 1000L)), 3L); + + processKeyValueAndVerifyWindowedCountWithTimestamp(1, 42L, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)))); + + processKeyValueAndVerifyWindowedCountWithTimestamp(2, 45L, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)))); + + // can process "out of order" record for different key + processKeyValueAndVerifyWindowedCountWithTimestamp(4, 21L, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(4L, -1L)))); + + processKeyValueAndVerifyWindowedCountWithTimestamp(4, 42L, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(5L, -1L)))); + + // out of order (same key) record should not reduce result timestamp + processKeyValueAndVerifyWindowedCountWithTimestamp(4, 10L, asList( + KeyValue.pair(new Windowed<>(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), + KeyValue.pair(new Windowed<>(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)), + KeyValue.pair(new Windowed<>(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), + KeyValue.pair(new Windowed<>(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(6L, -1L)))); + + // test new segment + processKeyValueAndVerifyWindowedCountWithTimestamp(10, 100001L, singletonList( + KeyValue.pair(new Windowed<>(10, new TimeWindow(100000L, 101000L)), ValueAndTimestamp.make(1L, -1L)))); + + + kafkaStreams.close(); + } + + private <K, V> void processWindowedKeyValueAndVerifyPlainCount(final K key, + final List<KeyValue<Windowed<Integer>, Object>> expectedStoreContent) + throws Exception { + + IntegrationTestUtils.produceKeyValuesSynchronously( + inputStream, + singletonList(KeyValue.pair(key, 0)), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class), + CLUSTER.time); + + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyWindowStore<K, V> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.windowStore()); + try (final KeyValueIterator<Windowed<K>, V> all = store.all()) { + final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>(); + while (all.hasNext()) { + storeContent.add(all.next()); + } + return storeContent.equals(expectedStoreContent); + } + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + private <K> void verifyWindowedCountWithSurrogateTimestamp(final Windowed<K> key, + final long value) throws Exception { + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()); + final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start()); + return count.value() == value && count.timestamp() == -1L; + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + private <K> void verifyWindowedCountWithTimestamp(final Windowed<K> key, + final long value, + final long timestamp) throws Exception { + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()); + final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start()); + return count.value() == value && count.timestamp() == timestamp; + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + private <K, V> void processKeyValueAndVerifyWindowedCountWithTimestamp(final K key, + final long timestamp, + final List<KeyValue<Windowed<Integer>, Object>> expectedStoreContent) + throws Exception { + + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputStream, + singletonList(KeyValue.pair(key, 0)), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class), + timestamp); + + TestUtils.waitForCondition(() -> { + try { + final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store = + kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore()); + try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) { + final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>(); + while (all.hasNext()) { + storeContent.add(all.next()); + } + return storeContent.equals(expectedStoreContent); + } + } catch (final Exception swallow) { + swallow.printStackTrace(); + System.err.println(swallow.getMessage()); + return false; + } + }, "Could not get expected result in time."); + } + + private static class KeyValueProcessor implements Processor<Integer, Integer> { + private KeyValueStore<Integer, Long> store; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Integer key, final Integer value) { + final long newCount; + + final Long oldCount = store.get(key); + if (oldCount != null) { + newCount = oldCount + 1L; + } else { + newCount = 1L; + } + + store.put(key, newCount); + } + + @Override + public void close() {} + } + + private static class TimestampedKeyValueProcessor implements Processor<Integer, Integer> { + private ProcessorContext context; + private TimestampedKeyValueStore<Integer, Long> store; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + this.context = context; + store = (TimestampedKeyValueStore<Integer, Long>) context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Integer key, final Integer value) { + final long newCount; + + final ValueAndTimestamp<Long> oldCountWithTimestamp = store.get(key); + final long newTimestamp; + + if (oldCountWithTimestamp == null) { + newCount = 1L; + newTimestamp = context.timestamp(); + } else { + newCount = oldCountWithTimestamp.value() + 1L; + newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), context.timestamp()); + } + + store.put(key, ValueAndTimestamp.make(newCount, newTimestamp)); + } + + @Override + public void close() {} + } + + private static class WindowedProcessor implements Processor<Integer, Integer> { + private WindowStore<Integer, Long> store; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + store = (WindowStore<Integer, Long>) context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Integer key, final Integer value) { + final long newCount; + + final Long oldCount = store.fetch(key, key < 10 ? 0L : 100000L); + if (oldCount != null) { + newCount = oldCount + 1L; + } else { + newCount = 1L; + } + + store.put(key, newCount, key < 10 ? 0L : 100000L); + } + + @Override + public void close() {} + } + + private static class TimestampedWindowedProcessor implements Processor<Integer, Integer> { + private ProcessorContext context; + private TimestampedWindowStore<Integer, Long> store; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + this.context = context; + store = (TimestampedWindowStore<Integer, Long>) context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Integer key, final Integer value) { + final long newCount; + + final ValueAndTimestamp<Long> oldCountWithTimestamp = store.fetch(key, key < 10 ? 0L : 100000L); + final long newTimestamp; + + if (oldCountWithTimestamp == null) { + newCount = 1L; + newTimestamp = context.timestamp(); + } else { + newCount = oldCountWithTimestamp.value() + 1L; + newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), context.timestamp()); + } + + store.put(key, ValueAndTimestamp.make(newCount, newTimestamp), key < 10 ? 0L : 100000L); + } + + @Override + public void close() {} + } +} \ No newline at end of file diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index d577cf7..2394203 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; @@ -78,6 +77,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -465,23 +465,17 @@ public class TopologyTestDriverTest { topology.addSink( "sink", SINK_TOPIC_1, - new Serializer<Object>() { - @Override - public byte[] serialize(final String topic, final Object data) { - if (data instanceof Long) { - return Serdes.Long().serializer().serialize(topic, (Long) data); - } - return Serdes.Integer().serializer().serialize(topic, (Integer) data); + (topic, data) -> { + if (data instanceof Long) { + return Serdes.Long().serializer().serialize(topic, (Long) data); } + return Serdes.Integer().serializer().serialize(topic, (Integer) data); }, - new Serializer<Object>() { - @Override - public byte[] serialize(final String topic, final Object data) { - if (data instanceof String) { - return Serdes.String().serializer().serialize(topic, (String) data); - } - return Serdes.Double().serializer().serialize(topic, (Double) data); + (topic, data) -> { + if (data instanceof String) { + return Serdes.String().serializer().serialize(topic, (String) data); } + return Serdes.Double().serializer().serialize(topic, (Double) data); }, processor); @@ -736,6 +730,160 @@ public class TopologyTestDriverTest { } @Test + public void shouldReturnCorrectPersistentStoreTypeOnly() { + shouldReturnCorrectStoreTypeOnly(true); + } + + @Test + public void shouldReturnCorrectInMemoryStoreTypeOnly() { + shouldReturnCorrectStoreTypeOnly(false); + } + + private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) { + final String keyValueStoreName = "keyValueStore"; + final String timestampedKeyValueStoreName = "keyValueTimestampStore"; + final String windowStoreName = "windowStore"; + final String timestampedWindowStoreName = "windowTimestampStore"; + final String sessionStoreName = "sessionStore"; + final String globalKeyValueStoreName = "globalKeyValueStore"; + final String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore"; + + final Topology topology = setupSingleProcessorTopology(); + + // add state stores + topology.addStateStore( + Stores.keyValueStoreBuilder( + persistent ? + Stores.persistentKeyValueStore(keyValueStoreName) : + Stores.inMemoryKeyValueStore(keyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "processor"); + topology.addStateStore( + Stores.timestampedKeyValueStoreBuilder( + persistent ? + Stores.persistentTimestampedKeyValueStore(timestampedKeyValueStoreName) : + Stores.inMemoryKeyValueStore(timestampedKeyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "processor"); + topology.addStateStore( + Stores.windowStoreBuilder( + persistent ? + Stores.persistentWindowStore(windowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) : + Stores.inMemoryWindowStore(windowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "processor"); + topology.addStateStore( + Stores.timestampedWindowStoreBuilder( + persistent ? + Stores.persistentTimestampedWindowStore(timestampedWindowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) : + Stores.inMemoryWindowStore(timestampedWindowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "processor"); + topology.addStateStore( + persistent ? + Stores.sessionStoreBuilder( + Stores.persistentSessionStore(sessionStoreName, Duration.ofMillis(1000L)), + Serdes.ByteArray(), + Serdes.ByteArray()) : + Stores.sessionStoreBuilder( + Stores.inMemorySessionStore(sessionStoreName, Duration.ofMillis(1000L)), + Serdes.ByteArray(), + Serdes.ByteArray()), + "processor"); + // add global stores + topology.addGlobalStore( + persistent ? + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(globalKeyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ).withLoggingDisabled() : + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(globalKeyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ).withLoggingDisabled(), + "sourceDummy1", + Serdes.ByteArray().deserializer(), + Serdes.ByteArray().deserializer(), + "topicDummy1", + "processorDummy1", + () -> null); + topology.addGlobalStore( + persistent ? + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore(globalTimestampedKeyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ).withLoggingDisabled() : + Stores.timestampedKeyValueStoreBuilder( + Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ).withLoggingDisabled(), + "sourceDummy2", + Serdes.ByteArray().deserializer(), + Serdes.ByteArray().deserializer(), + "topicDummy2", + "processorDummy2", + () -> null); + + testDriver = new TopologyTestDriver(topology, config); + + // verify state stores + assertNotNull(testDriver.getKeyValueStore(keyValueStoreName)); + assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName)); + assertNull(testDriver.getWindowStore(keyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName)); + assertNull(testDriver.getSessionStore(keyValueStoreName)); + + assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName)); + assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName)); + assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName)); + assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName)); + + assertNull(testDriver.getKeyValueStore(windowStoreName)); + assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName)); + assertNotNull(testDriver.getWindowStore(windowStoreName)); + assertNull(testDriver.getTimestampedWindowStore(windowStoreName)); + assertNull(testDriver.getSessionStore(windowStoreName)); + + assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName)); + assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName)); + assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName)); + assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName)); + assertNull(testDriver.getSessionStore(timestampedWindowStoreName)); + + assertNull(testDriver.getKeyValueStore(sessionStoreName)); + assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName)); + assertNull(testDriver.getWindowStore(sessionStoreName)); + assertNull(testDriver.getTimestampedWindowStore(sessionStoreName)); + assertNotNull(testDriver.getSessionStore(sessionStoreName)); + + // verify global stores + assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName)); + assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName)); + assertNull(testDriver.getWindowStore(globalKeyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName)); + assertNull(testDriver.getSessionStore(globalKeyValueStoreName)); + + assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName)); + assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName)); + assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName)); + assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName)); + } + + @Test public void shouldReturnAllStoresNames() { final Topology topology = setupSourceSinkTopology(); topology.addStateStore( @@ -793,7 +941,7 @@ public class TopologyTestDriverTest { setup(); testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L)); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); - Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); } @Test @@ -802,7 +950,7 @@ public class TopologyTestDriverTest { testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L)); assertThat(store.get("a"), equalTo(21L)); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); - Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); } @Test @@ -811,7 +959,7 @@ public class TopologyTestDriverTest { testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L)); assertThat(store.get("a"), equalTo(42L)); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L); - Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); } @Test @@ -821,7 +969,7 @@ public class TopologyTestDriverTest { assertThat(store.get("b"), equalTo(21L)); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L); - Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); } @Test @@ -831,11 +979,11 @@ public class TopologyTestDriverTest { OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L)); - Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L)); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); - Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); } @Test @@ -843,7 +991,7 @@ public class TopologyTestDriverTest { setup(); testDriver.advanceWallClockTime(60000); OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); - Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); } private class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> { @@ -944,14 +1092,14 @@ public class TopologyTestDriverTest { config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) { - Assert.assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a")); + assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a")); testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L)); Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a")); } try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) { - Assert.assertNull( + assertNull( "Closing the prior test driver should have cleaned up this store and value.", testDriver.getKeyValueStore("storeProcessorStore").get("a") );