cadonna commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r893161440
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.Assert; +import org.junit.Test; + +public class TaskExecutionMetadataTest { + final static String TOPOLOGY1 = "topology1"; + final static String TOPOLOGY2 = "topology2"; + final static Set<String> NAMED_TOPOLOGIES = new HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2)); + final static int TIME_ZERO = 0; + final static int CONSTANT_BACKOFF_MS = 5000; + + @Test + public void testCanProcessWithoutNamedTopologies() { + final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY); + final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); Review Comment: Out of curiosity, why do you use a `ConcurrentHashMap` here? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.Assert; +import org.junit.Test; + +public class TaskExecutionMetadataTest { + final static String TOPOLOGY1 = "topology1"; + final static String TOPOLOGY2 = "topology2"; + final static Set<String> NAMED_TOPOLOGIES = new HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2)); + final static int TIME_ZERO = 0; + final static int CONSTANT_BACKOFF_MS = 5000; + + @Test + public void testCanProcessWithoutNamedTopologies() { + final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY); + final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); + + final TaskExecutionMetadata metadata = new TaskExecutionMetadata(topologies, + pausedTopologies); Review Comment: nit: ```suggestion final TaskExecutionMetadata metadata = new TaskExecutionMetadata(topologies, pausedTopologies); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.Assert; +import org.junit.Test; + +public class TaskExecutionMetadataTest { + final static String TOPOLOGY1 = "topology1"; + final static String TOPOLOGY2 = "topology2"; + final static Set<String> NAMED_TOPOLOGIES = new HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2)); + final static int TIME_ZERO = 0; + final static int CONSTANT_BACKOFF_MS = 5000; + + @Test + public void testCanProcessWithoutNamedTopologies() { + final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY); + final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); + + final TaskExecutionMetadata metadata = new TaskExecutionMetadata(topologies, + pausedTopologies); + + final Task mockTask = createMockTask(UNNAMED_TOPOLOGY); + + Assert.assertTrue(metadata.canProcessTask(mockTask, TIME_ZERO)); + // This pauses an UNNAMED_TOPOLOGY / a KafkaStreams instance without named/modular + // topologies. + pausedTopologies.add(UNNAMED_TOPOLOGY); + Assert.assertFalse(metadata.canProcessTask(mockTask, TIME_ZERO)); + } + + @Test + public void testNamedTopologiesCanBePausedIndependently() { + final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet(); + final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES, + pausedTopologies); Review Comment: nit: ```suggestion final TaskExecutionMetadata metadata = new TaskExecutionMetadata(NAMED_TOPOLOGIES, pausedTopologies); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -83,67 +137,12 @@ import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; - -import java.util.function.BiConsumer; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; -import java.io.File; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Stream; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static java.util.Collections.singleton; -import static java.util.Collections.singletonMap; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; -import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyInt; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.niceMock; -import static org.easymock.EasyMock.verify; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.startsWith; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.isA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - Review Comment: I think you IDE reformatted the imports. The same occurred in `TaskExecutionMetadataTest`. Could you check also in other files? We use more or less the following import order: all other imports non-static (sorted alphabetically) <blank line> java.* (sorted alphabetically) javax.* (sorted alphabetically) <blank line> static imports (sorted alphabetically) However, we are not really consistent across files. Nevertheless we should try to keep that order. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1686,8 +1685,74 @@ public void shouldUpdateStandbyTask() throws Exception { final String storeName2 = "table-two"; final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; + final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; + + setupThread(storeName1, storeName2, changelogName1, changelogName2, thread, restoreConsumer, false); + + thread.runOnce(); + + final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); + final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1); + assertEquals(task1, standbyTask1.id()); + assertEquals(task3, standbyTask2.id()); + + final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1); + final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2); + + assertEquals(0L, store1.approximateNumEntries()); + assertEquals(0L, store2.approximateNumEntries()); + + addStandbyRecordsToRestoreConsumer(restoreConsumer); + + thread.runOnce(); + + assertEquals(10L, store1.approximateNumEntries()); + assertEquals(4L, store2.approximateNumEntries()); + + thread.taskManager().shutdown(true); + } + + private void addActiveRecordsToRestoreConsumer(final MockConsumer<byte[], byte[]> restoreConsumer) { + for (long i = 0L; i < 10L; i++) { + restoreConsumer.addRecord(new ConsumerRecord<>( + "stream-thread-test-count-one-changelog", + 2, + i, + ("K" + i).getBytes(), + ("V" + i).getBytes())); + } + } + + private void addStandbyRecordsToRestoreConsumer(final MockConsumer<byte[], byte[]> restoreConsumer) { + // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10 + for (long i = 0L; i < 10L; i++) { + restoreConsumer.addRecord(new ConsumerRecord<>( + "stream-thread-test-count-one-changelog", Review Comment: Why do you not pass in the store name or use a class constant? Same below and above. ########## checkstyle/suppressions.xml: ########## @@ -225,7 +225,7 @@ files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/> <suppress checks="JavaNCSS" - files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/> + files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/> Review Comment: In general, I think we should try to avoid adding suppressions. But I also see that `StreamThreadTest` would need quite some love at the moment which is not the intent of this PR. ########## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.KeyValue.pair; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.LagInfo; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class PauseResumeIntegrationTest { + private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private static Properties producerConfig; + private static Properties consumerConfig; + + private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = + Materialized.as(Stores.inMemoryKeyValueStore("store")); + + private static final String INPUT_STREAM_1 = "input-stream-1"; + private static final String INPUT_STREAM_2 = "input-stream-2"; + private static final String OUTPUT_STREAM_1 = "output-stream-1"; + private static final String OUTPUT_STREAM_2 = "output-stream-2"; + private static final String TOPOLOGY1 = "topology1"; + private static final String TOPOLOGY2 = "topology2"; + + private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA = + asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); + private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA = + asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); + private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 = + asList(pair("B", 2L), pair("A", 4L), pair("C", 4L)); + + private String appId; + private KafkaStreams kafkaStreams, kafkaStreams2; + private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper; + + @Rule + public final TestName testName = new TestName(); + + @BeforeClass + public static void startCluster() throws Exception { + CLUSTER.start(); + producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), + StringSerializer.class, LongSerializer.class); + consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), + StringDeserializer.class, LongDeserializer.class); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Before + public void createTopics() throws InterruptedException { + cleanStateBeforeTest(CLUSTER, 2, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2); + appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName); + } + + private Properties props() { + final Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + return properties; + } + + @After + public void shutdown() throws InterruptedException { + for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) { + if (streams != null) { + streams.close(Duration.ofSeconds(30)); + } + } + } + + private static void produceToInputTopics(final String topic, final Collection<KeyValue<String, Long>> records) { + IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time); + } + + @Test + public void shouldPauseAndResumeKafkaStreams() throws Exception { + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + kafkaStreams.start(); + waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + + kafkaStreams.pause(); + assertTrue(kafkaStreams.isPaused()); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + assertNoLag(kafkaStreams); + + waitUntilStreamsHasPolled(kafkaStreams, 2); Review Comment: I like your approach! ########## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.KeyValue.pair; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.LagInfo; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class PauseResumeIntegrationTest { + private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private static Properties producerConfig; + private static Properties consumerConfig; + + private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = + Materialized.as(Stores.inMemoryKeyValueStore("store")); + + private static final String INPUT_STREAM_1 = "input-stream-1"; + private static final String INPUT_STREAM_2 = "input-stream-2"; + private static final String OUTPUT_STREAM_1 = "output-stream-1"; + private static final String OUTPUT_STREAM_2 = "output-stream-2"; + private static final String TOPOLOGY1 = "topology1"; + private static final String TOPOLOGY2 = "topology2"; + + private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA = + asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); + private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA = + asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); + private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 = + asList(pair("B", 2L), pair("A", 4L), pair("C", 4L)); + + private String appId; + private KafkaStreams kafkaStreams, kafkaStreams2; + private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper; + + @Rule + public final TestName testName = new TestName(); + + @BeforeClass + public static void startCluster() throws Exception { + CLUSTER.start(); + producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), + StringSerializer.class, LongSerializer.class); + consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), + StringDeserializer.class, LongDeserializer.class); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Before + public void createTopics() throws InterruptedException { + cleanStateBeforeTest(CLUSTER, 2, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2); + appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName); + } + + private Properties props() { + final Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + return properties; + } + + @After + public void shutdown() throws InterruptedException { + for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) { + if (streams != null) { + streams.close(Duration.ofSeconds(30)); + } + } + } + + private static void produceToInputTopics(final String topic, final Collection<KeyValue<String, Long>> records) { + IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time); + } + + @Test + public void shouldPauseAndResumeKafkaStreams() throws Exception { + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + kafkaStreams.start(); + waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + + kafkaStreams.pause(); + assertTrue(kafkaStreams.isPaused()); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + assertNoLag(kafkaStreams); + + waitUntilStreamsHasPolled(kafkaStreams, 2); + assertTopicSize(OUTPUT_STREAM_1, 3); + + kafkaStreams.resume(); + assertFalse(kafkaStreams.isPaused()); + + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2); + assertTopicSize(OUTPUT_STREAM_1, 6); + } + + @Test + public void shouldAllowForTopologiesToStartPaused() throws Exception { + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + kafkaStreams.pause(); + kafkaStreams.start(); + waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + assertTrue(kafkaStreams.isPaused()); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + assertNoLag(kafkaStreams); + + waitUntilStreamsHasPolled(kafkaStreams, 2); + assertTopicSize(OUTPUT_STREAM_1, 0); + + kafkaStreams.resume(); + assertFalse(kafkaStreams.isPaused()); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + assertTopicSize(OUTPUT_STREAM_1, 3); + } + + @Test + public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); + final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); + + streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build())); + waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA); + assertTopicSize(OUTPUT_STREAM_1, 3); + assertTopicSize(OUTPUT_STREAM_2, 3); + + streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + assertFalse(streamsNamedTopologyWrapper.isPaused()); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + assertNoLag(streamsNamedTopologyWrapper); + + awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA2); + assertTopicSize(OUTPUT_STREAM_1, 3); + assertTopicSize(OUTPUT_STREAM_2, 6); + + streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2); + } + + @Test + public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); + final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); + + streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build())); + waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA); + + streamsNamedTopologyWrapper.pause(); + assertTrue(streamsNamedTopologyWrapper.isPaused()); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + assertNoLag(streamsNamedTopologyWrapper); + + waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2); + assertTopicSize(OUTPUT_STREAM_1, 3); + assertTopicSize(OUTPUT_STREAM_2, 3); + + streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1); + assertFalse(streamsNamedTopologyWrapper.isPaused()); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2); + assertTopicSize(OUTPUT_STREAM_1, 6); + assertTopicSize(OUTPUT_STREAM_2, 3); + } + + @Test + public void shouldAllowForNamedTopologiesToStartPaused() throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); + final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); + + streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1); + streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build())); + waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT); + + assertFalse(streamsNamedTopologyWrapper.isPaused()); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA); + assertTopicSize(OUTPUT_STREAM_1, 0); + + streamsNamedTopologyWrapper.pause(); + assertTrue(streamsNamedTopologyWrapper.isPaused()); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + assertNoLag(streamsNamedTopologyWrapper); + + waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2); + assertTopicSize(OUTPUT_STREAM_1, 0); + assertTopicSize(OUTPUT_STREAM_2, 3); + + streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1); + assertFalse(streamsNamedTopologyWrapper.isPaused()); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2); + assertTopicSize(OUTPUT_STREAM_1, 3); + assertTopicSize(OUTPUT_STREAM_2, 3); + } + + @Test + public void pauseResumehouldWorkAcrossInstances() throws Exception { + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + kafkaStreams.pause(); + kafkaStreams.start(); + + waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + assertTrue(kafkaStreams.isPaused()); + + kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2); + kafkaStreams2.pause(); + kafkaStreams2.start(); + assertTrue(kafkaStreams2.isPaused()); + + waitUntilStreamsHasPolled(kafkaStreams, 2); + waitUntilStreamsHasPolled(kafkaStreams2, 2); + assertTopicSize(OUTPUT_STREAM_1, 0); + + kafkaStreams2.close(); + kafkaStreams2.cleanUp(); + waitForApplicationState(singletonList(kafkaStreams2), State.NOT_RUNNING, STARTUP_TIMEOUT); + + kafkaStreams.resume(); + waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + } + Review Comment: Could you please also add a test that verifies if active tasks in restoration and standbys are paused? Something like you start two Streams clients with 1 standby. If you have one partition one client should get the active stateful task and the other should get the standby task. The clients process some data and write some data into their states. Then shutdown the Streams clients and wipe out the local state. Finally, start both clients paused and verify if the lag of the local stores stays constant and greater than zero for a couple of poll loop iterations. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -479,6 +485,47 @@ public void restore(final Map<TaskId, Task> tasks) { } } + private void updateStandbyPartitions(final Map<TaskId, Task> tasks, + final Set<TopicPartition> restoringChangelogs) { + if (state == ChangelogReaderState.ACTIVE_RESTORING) { + updatePartitionsByType(tasks, restoringChangelogs, TaskType.ACTIVE); + } + if (state == ChangelogReaderState.STANDBY_UPDATING) { + updatePartitionsByType(tasks, restoringChangelogs, TaskType.STANDBY); + } + } + + private void updatePartitionsByType(final Map<TaskId, Task> tasks, + final Set<TopicPartition> restoringChangelogs, + final TaskType taskType) { + final Collection<TopicPartition> toResume = + restoringChangelogs.stream().filter(t -> shouldResume(tasks, t, taskType)).collect(Collectors.toList()); + final Collection<TopicPartition> toPause = + restoringChangelogs.stream().filter(t -> shouldPause(tasks, t, taskType)).collect(Collectors.toList()); + restoreConsumer.resume(toResume); + restoreConsumer.pause(toPause); + } + + private boolean shouldResume(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) { + final ProcessorStateManager manager = changelogs.get(partition).stateManager; + final TaskId taskId = manager.taskId(); + final Task task = tasks.get(taskId); + if (manager.taskType() == taskType) { + return task != null; + } + return false; + } + + private boolean shouldPause(final Map<TaskId, Task> tasks, final TopicPartition partition, final TaskType taskType) { + final ProcessorStateManager manager = changelogs.get(partition).stateManager; + final TaskId taskId = manager.taskId(); + final Task task = tasks.get(taskId); + if (manager.taskType() == taskType) { + return task == null; + } + return false; + } Review Comment: Why not check the task type earlier? If the task type does not match, you do not need to do anything else. ```suggestion final ProcessorStateManager manager = changelogs.get(partition).stateManager; if (manager.taskType() == taskType) { final TaskId taskId = manager.taskId(); final Task task = tasks.get(taskId); return task == null; } return false; } ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1686,8 +1685,74 @@ public void shouldUpdateStandbyTask() throws Exception { final String storeName2 = "table-two"; final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; + final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; + + setupThread(storeName1, storeName2, changelogName1, changelogName2, thread, restoreConsumer, false); + + thread.runOnce(); + + final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); + final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1); + assertEquals(task1, standbyTask1.id()); + assertEquals(task3, standbyTask2.id()); + + final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1); + final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2); + + assertEquals(0L, store1.approximateNumEntries()); + assertEquals(0L, store2.approximateNumEntries()); + + addStandbyRecordsToRestoreConsumer(restoreConsumer); + + thread.runOnce(); + + assertEquals(10L, store1.approximateNumEntries()); + assertEquals(4L, store2.approximateNumEntries()); + + thread.taskManager().shutdown(true); + } + + private void addActiveRecordsToRestoreConsumer(final MockConsumer<byte[], byte[]> restoreConsumer) { + for (long i = 0L; i < 10L; i++) { + restoreConsumer.addRecord(new ConsumerRecord<>( + "stream-thread-test-count-one-changelog", + 2, + i, + ("K" + i).getBytes(), + ("V" + i).getBytes())); + } + } + + private void addStandbyRecordsToRestoreConsumer(final MockConsumer<byte[], byte[]> restoreConsumer) { + // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10 + for (long i = 0L; i < 10L; i++) { + restoreConsumer.addRecord(new ConsumerRecord<>( + "stream-thread-test-count-one-changelog", + 1, + i, + ("K" + i).getBytes(), + ("V" + i).getBytes())); + restoreConsumer.addRecord(new ConsumerRecord<>( + "stream-thread-test-table-two-changelog", + 1, + i, + ("K" + i).getBytes(), + ("V" + i).getBytes())); + } + } + + private void setupThread(final String storeName1, + final String storeName2, + final String changelogName1, + final String changelogName2, + final StreamThread thread, + final MockConsumer<byte[], byte[]> restoreConsumer, + final boolean addActiveTask) throws IOException { + final TopicPartition activePartition = new TopicPartition(changelogName1, 2); final TopicPartition partition1 = new TopicPartition(changelogName1, 1); final TopicPartition partition2 = new TopicPartition(changelogName2, 1); Review Comment: ```suggestion private void setupThread(final String storeName1, final String storeName2, final String changelogName1, final String changelogName2, final StreamThread thread, final MockConsumer<byte[], byte[]> restoreConsumer, final boolean addActiveTask) throws IOException { final TopicPartition activePartition = new TopicPartition(changelogName1, 2); final TopicPartition partition1 = new TopicPartition(changelogName1, 1); final TopicPartition partition2 = new TopicPartition(changelogName2, 1); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -479,6 +485,47 @@ public void restore(final Map<TaskId, Task> tasks) { } } + private void updateStandbyPartitions(final Map<TaskId, Task> tasks, Review Comment: After your changes that include the restoration of active tasks the name of the method does not reflect what the method does anymore. Please rename the method to something more appropriate. ########## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.KeyValue.pair; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.LagInfo; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class PauseResumeIntegrationTest { + private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private static Properties producerConfig; + private static Properties consumerConfig; + + private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = + Materialized.as(Stores.inMemoryKeyValueStore("store")); + + private static final String INPUT_STREAM_1 = "input-stream-1"; + private static final String INPUT_STREAM_2 = "input-stream-2"; + private static final String OUTPUT_STREAM_1 = "output-stream-1"; + private static final String OUTPUT_STREAM_2 = "output-stream-2"; + private static final String TOPOLOGY1 = "topology1"; + private static final String TOPOLOGY2 = "topology2"; + + private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA = + asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); + private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA = + asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); + private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 = + asList(pair("B", 2L), pair("A", 4L), pair("C", 4L)); + + private String appId; + private KafkaStreams kafkaStreams, kafkaStreams2; + private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper; + + @Rule + public final TestName testName = new TestName(); + + @BeforeClass + public static void startCluster() throws Exception { + CLUSTER.start(); + producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), + StringSerializer.class, LongSerializer.class); + consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), + StringDeserializer.class, LongDeserializer.class); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Before + public void createTopics() throws InterruptedException { + cleanStateBeforeTest(CLUSTER, 2, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2); + appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName); + } + + private Properties props() { + final Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + return properties; + } + + @After + public void shutdown() throws InterruptedException { + for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) { + if (streams != null) { + streams.close(Duration.ofSeconds(30)); + } + } + } + + private static void produceToInputTopics(final String topic, final Collection<KeyValue<String, Long>> records) { + IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time); + } + + @Test + public void shouldPauseAndResumeKafkaStreams() throws Exception { + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + kafkaStreams.start(); + waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + + kafkaStreams.pause(); + assertTrue(kafkaStreams.isPaused()); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + assertNoLag(kafkaStreams); Review Comment: I just realized that this method computes the lag of the store regarding the changelog partition and not of the input partitions. Was this intentional? I think the lag of the store regarding the changelog topic will always be zero in this case because in normal mode the Streams client writes to the changelog topic and so it is always up-to-date. I thought that this method verified that the main consumer does not have any lag regarding the input partitions which would tell us that the main consumer polled data although it was paused which is the expected behavior. This method with `waitUntilStreamsHasPolled(kafkaStreams, 2);` and `assertTopicSize(OUTPUT_STREAM_1, <same size as before the call to pause>);` would tell us that although the Streams clients polled data and went through the poll loop, no data was produced to the output topic. For that you should also set the cache size to 0 with `STATESTORE_CACHE_MAX_BYTES_CONFIG`. ########## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.KeyValue.pair; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.LagInfo; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class PauseResumeIntegrationTest { + private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private static Properties producerConfig; + private static Properties consumerConfig; + + private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = + Materialized.as(Stores.inMemoryKeyValueStore("store")); + + private static final String INPUT_STREAM_1 = "input-stream-1"; + private static final String INPUT_STREAM_2 = "input-stream-2"; + private static final String OUTPUT_STREAM_1 = "output-stream-1"; + private static final String OUTPUT_STREAM_2 = "output-stream-2"; + private static final String TOPOLOGY1 = "topology1"; + private static final String TOPOLOGY2 = "topology2"; + + private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA = + asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); + private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA = + asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); + private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 = + asList(pair("B", 2L), pair("A", 4L), pair("C", 4L)); + + private String appId; + private KafkaStreams kafkaStreams, kafkaStreams2; + private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper; + + @Rule + public final TestName testName = new TestName(); + + @BeforeClass + public static void startCluster() throws Exception { + CLUSTER.start(); + producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), + StringSerializer.class, LongSerializer.class); + consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), + StringDeserializer.class, LongDeserializer.class); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Before + public void createTopics() throws InterruptedException { + cleanStateBeforeTest(CLUSTER, 2, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2); + appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName); + } + + private Properties props() { + final Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + return properties; + } + + @After + public void shutdown() throws InterruptedException { + for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) { + if (streams != null) { + streams.close(Duration.ofSeconds(30)); + } + } + } + + private static void produceToInputTopics(final String topic, final Collection<KeyValue<String, Long>> records) { + IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time); + } + + @Test + public void shouldPauseAndResumeKafkaStreams() throws Exception { + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + kafkaStreams.start(); + waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + + kafkaStreams.pause(); + assertTrue(kafkaStreams.isPaused()); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + assertNoLag(kafkaStreams); + + waitUntilStreamsHasPolled(kafkaStreams, 2); + assertTopicSize(OUTPUT_STREAM_1, 3); + + kafkaStreams.resume(); + assertFalse(kafkaStreams.isPaused()); + + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2); + assertTopicSize(OUTPUT_STREAM_1, 6); + } + + @Test + public void shouldAllowForTopologiesToStartPaused() throws Exception { + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + kafkaStreams.pause(); + kafkaStreams.start(); + waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + assertTrue(kafkaStreams.isPaused()); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + assertNoLag(kafkaStreams); + + waitUntilStreamsHasPolled(kafkaStreams, 2); + assertTopicSize(OUTPUT_STREAM_1, 0); + + kafkaStreams.resume(); + assertFalse(kafkaStreams.isPaused()); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + assertTopicSize(OUTPUT_STREAM_1, 3); + } + + @Test + public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); + final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); + + streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build())); + waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA); + assertTopicSize(OUTPUT_STREAM_1, 3); + assertTopicSize(OUTPUT_STREAM_2, 3); + + streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + assertFalse(streamsNamedTopologyWrapper.isPaused()); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + assertNoLag(streamsNamedTopologyWrapper); + + awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA2); + assertTopicSize(OUTPUT_STREAM_1, 3); + assertTopicSize(OUTPUT_STREAM_2, 6); + + streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2); + } + + @Test + public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); + final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); + + streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build())); + waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); + awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA); + + streamsNamedTopologyWrapper.pause(); + assertTrue(streamsNamedTopologyWrapper.isPaused()); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + assertNoLag(streamsNamedTopologyWrapper); + + waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2); + assertTopicSize(OUTPUT_STREAM_1, 3); + assertTopicSize(OUTPUT_STREAM_2, 3); + + streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1); + assertFalse(streamsNamedTopologyWrapper.isPaused()); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2); + assertTopicSize(OUTPUT_STREAM_1, 6); + assertTopicSize(OUTPUT_STREAM_2, 3); + } + + @Test + public void shouldAllowForNamedTopologiesToStartPaused() throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); + final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); + + streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1); + streamsNamedTopologyWrapper.start(asList(builder1.build(), builder2.build())); + waitForApplicationState(singletonList(streamsNamedTopologyWrapper), State.RUNNING, STARTUP_TIMEOUT); + + assertFalse(streamsNamedTopologyWrapper.isPaused()); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA); + assertTopicSize(OUTPUT_STREAM_1, 0); + + streamsNamedTopologyWrapper.pause(); + assertTrue(streamsNamedTopologyWrapper.isPaused()); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + + produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA); + + assertNoLag(streamsNamedTopologyWrapper); + + waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2); + assertTopicSize(OUTPUT_STREAM_1, 0); + assertTopicSize(OUTPUT_STREAM_2, 3); + + streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1); + assertFalse(streamsNamedTopologyWrapper.isPaused()); + assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1)); + assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2)); + + awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2); + assertTopicSize(OUTPUT_STREAM_1, 3); + assertTopicSize(OUTPUT_STREAM_2, 3); + } + + @Test + public void pauseResumehouldWorkAcrossInstances() throws Exception { Review Comment: Typo: ` pauseResumehouldWorkAcrossInstances` -> ` pauseResumeShouldWorkAcrossInstances` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org