cadonna commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r893161440


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskExecutionMetadataTest {
+    final static String TOPOLOGY1 = "topology1";
+    final static String TOPOLOGY2 = "topology2";
+    final static Set<String> NAMED_TOPOLOGIES = new 
HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2));
+    final static int TIME_ZERO = 0;
+    final static int CONSTANT_BACKOFF_MS = 5000;
+
+    @Test
+    public void testCanProcessWithoutNamedTopologies() {
+        final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY);
+        final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();

Review Comment:
   Out of curiosity, why do you use a `ConcurrentHashMap` here?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskExecutionMetadataTest {
+    final static String TOPOLOGY1 = "topology1";
+    final static String TOPOLOGY2 = "topology2";
+    final static Set<String> NAMED_TOPOLOGIES = new 
HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2));
+    final static int TIME_ZERO = 0;
+    final static int CONSTANT_BACKOFF_MS = 5000;
+
+    @Test
+    public void testCanProcessWithoutNamedTopologies() {
+        final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY);
+        final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
+
+        final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(topologies,
+            pausedTopologies);

Review Comment:
   nit:
   ```suggestion
           final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(topologies, pausedTopologies);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadataTest.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskExecutionMetadataTest {
+    final static String TOPOLOGY1 = "topology1";
+    final static String TOPOLOGY2 = "topology2";
+    final static Set<String> NAMED_TOPOLOGIES = new 
HashSet<>(Arrays.asList(TOPOLOGY1, TOPOLOGY2));
+    final static int TIME_ZERO = 0;
+    final static int CONSTANT_BACKOFF_MS = 5000;
+
+    @Test
+    public void testCanProcessWithoutNamedTopologies() {
+        final Set<String> topologies = Collections.singleton(UNNAMED_TOPOLOGY);
+        final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
+
+        final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(topologies,
+            pausedTopologies);
+
+        final Task mockTask = createMockTask(UNNAMED_TOPOLOGY);
+
+        Assert.assertTrue(metadata.canProcessTask(mockTask, TIME_ZERO));
+        // This pauses an UNNAMED_TOPOLOGY / a KafkaStreams instance without 
named/modular
+        // topologies.
+        pausedTopologies.add(UNNAMED_TOPOLOGY);
+        Assert.assertFalse(metadata.canProcessTask(mockTask, TIME_ZERO));
+    }
+
+    @Test
+    public void testNamedTopologiesCanBePausedIndependently() {
+        final Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
+        final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(NAMED_TOPOLOGIES,
+            pausedTopologies);

Review Comment:
   nit:
   ```suggestion
           final TaskExecutionMetadata metadata = new 
TaskExecutionMetadata(NAMED_TOPOLOGIES, pausedTopologies);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -83,67 +137,12 @@
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-
-import java.util.function.BiConsumer;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 
-import java.io.File;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Stream;
-
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.emptySet;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonMap;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static 
org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
-import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.verify;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.startsWith;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-

Review Comment:
   I think you IDE reformatted the imports. The same occurred in 
`TaskExecutionMetadataTest`. Could you check also in other files? We use more 
or less the following import order:
   
   all other imports non-static  (sorted alphabetically)
   <blank line>
   java.* (sorted alphabetically)
   javax.* (sorted alphabetically)
   <blank line>
   static imports  (sorted alphabetically)
   
   However, we are not really consistent across files. Nevertheless we should 
try to keep that order. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1686,8 +1685,74 @@ public void shouldUpdateStandbyTask() throws Exception {
         final String storeName2 = "table-two";
         final String changelogName1 = APPLICATION_ID + "-" + storeName1 + 
"-changelog";
         final String changelogName2 = APPLICATION_ID + "-" + storeName2 + 
"-changelog";
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
+
+        setupThread(storeName1, storeName2, changelogName1, changelogName2, 
thread, restoreConsumer, false);
+
+        thread.runOnce();
+
+        final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), 
t1p1);
+        final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), 
t2p1);
+        assertEquals(task1, standbyTask1.id());
+        assertEquals(task3, standbyTask2.id());
+
+        final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, 
Long>) standbyTask1.getStore(storeName1);
+        final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, 
Long>) standbyTask2.getStore(storeName2);
+
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        addStandbyRecordsToRestoreConsumer(restoreConsumer);
+
+        thread.runOnce();
+
+        assertEquals(10L, store1.approximateNumEntries());
+        assertEquals(4L, store2.approximateNumEntries());
+
+        thread.taskManager().shutdown(true);
+    }
+
+    private void addActiveRecordsToRestoreConsumer(final MockConsumer<byte[], 
byte[]> restoreConsumer) {
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                "stream-thread-test-count-one-changelog",
+                2,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+        }
+    }
+
+    private void addStandbyRecordsToRestoreConsumer(final MockConsumer<byte[], 
byte[]> restoreConsumer) {
+        // let the store1 be restored from 0 to 10; store2 be restored from 5 
(checkpointed) to 10
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                "stream-thread-test-count-one-changelog",

Review Comment:
   Why do you not pass in the store name or use a class constant? Same below 
and above. 



##########
checkstyle/suppressions.xml:
##########
@@ -225,7 +225,7 @@
               
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
 
     <suppress checks="JavaNCSS"
-              
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
+              
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>

Review Comment:
   In general, I think we should try to avoid adding suppressions. But I also 
see that `StreamThreadTest` would need quite some love at the moment which is 
not the intent of this PR.



##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.LagInfo;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class PauseResumeIntegrationTest {
+    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+    private static Properties producerConfig;
+    private static Properties consumerConfig;
+
+    private static final Materialized<Object, Long, KeyValueStore<Bytes, 
byte[]>> IN_MEMORY_STORE =
+        Materialized.as(Stores.inMemoryKeyValueStore("store"));
+
+    private static final String INPUT_STREAM_1 = "input-stream-1";
+    private static final String INPUT_STREAM_2 = "input-stream-2";
+    private static final String OUTPUT_STREAM_1 = "output-stream-1";
+    private static final String OUTPUT_STREAM_2 = "output-stream-2";
+    private static final String TOPOLOGY1 = "topology1";
+    private static final String TOPOLOGY2 = "topology2";
+
+    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
+        asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
+        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 =
+        asList(pair("B", 2L), pair("A", 4L), pair("C", 4L));
+
+    private String appId;
+    private KafkaStreams kafkaStreams, kafkaStreams2;
+    private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    @BeforeClass
+    public static void startCluster() throws Exception {
+        CLUSTER.start();
+        producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+            StringSerializer.class, LongSerializer.class);
+        consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
+            StringDeserializer.class, LongDeserializer.class);
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void createTopics() throws InterruptedException {
+        cleanStateBeforeTest(CLUSTER, 2, INPUT_STREAM_1, INPUT_STREAM_2, 
OUTPUT_STREAM_1, OUTPUT_STREAM_2);
+        appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName);
+    }
+
+    private Properties props() {
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        return properties;
+    }
+
+    @After
+    public void shutdown() throws InterruptedException {
+        for (final KafkaStreams streams : Arrays.asList(kafkaStreams, 
kafkaStreams2, streamsNamedTopologyWrapper)) {
+            if (streams != null) {
+                streams.close(Duration.ofSeconds(30));
+            }
+        }
+    }
+
+    private static void produceToInputTopics(final String topic, final 
Collection<KeyValue<String, Long>> records) {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, 
producerConfig, CLUSTER.time);
+    }
+
+    @Test
+    public void shouldPauseAndResumeKafkaStreams() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.start();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+
+        kafkaStreams.pause();
+        assertTrue(kafkaStreams.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        assertNoLag(kafkaStreams);
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);

Review Comment:
   I like your approach!



##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.LagInfo;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class PauseResumeIntegrationTest {
+    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+    private static Properties producerConfig;
+    private static Properties consumerConfig;
+
+    private static final Materialized<Object, Long, KeyValueStore<Bytes, 
byte[]>> IN_MEMORY_STORE =
+        Materialized.as(Stores.inMemoryKeyValueStore("store"));
+
+    private static final String INPUT_STREAM_1 = "input-stream-1";
+    private static final String INPUT_STREAM_2 = "input-stream-2";
+    private static final String OUTPUT_STREAM_1 = "output-stream-1";
+    private static final String OUTPUT_STREAM_2 = "output-stream-2";
+    private static final String TOPOLOGY1 = "topology1";
+    private static final String TOPOLOGY2 = "topology2";
+
+    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
+        asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
+        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 =
+        asList(pair("B", 2L), pair("A", 4L), pair("C", 4L));
+
+    private String appId;
+    private KafkaStreams kafkaStreams, kafkaStreams2;
+    private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    @BeforeClass
+    public static void startCluster() throws Exception {
+        CLUSTER.start();
+        producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+            StringSerializer.class, LongSerializer.class);
+        consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
+            StringDeserializer.class, LongDeserializer.class);
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void createTopics() throws InterruptedException {
+        cleanStateBeforeTest(CLUSTER, 2, INPUT_STREAM_1, INPUT_STREAM_2, 
OUTPUT_STREAM_1, OUTPUT_STREAM_2);
+        appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName);
+    }
+
+    private Properties props() {
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        return properties;
+    }
+
+    @After
+    public void shutdown() throws InterruptedException {
+        for (final KafkaStreams streams : Arrays.asList(kafkaStreams, 
kafkaStreams2, streamsNamedTopologyWrapper)) {
+            if (streams != null) {
+                streams.close(Duration.ofSeconds(30));
+            }
+        }
+    }
+
+    private static void produceToInputTopics(final String topic, final 
Collection<KeyValue<String, Long>> records) {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, 
producerConfig, CLUSTER.time);
+    }
+
+    @Test
+    public void shouldPauseAndResumeKafkaStreams() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.start();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+
+        kafkaStreams.pause();
+        assertTrue(kafkaStreams.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        assertNoLag(kafkaStreams);
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+
+        kafkaStreams.resume();
+        assertFalse(kafkaStreams.isPaused());
+
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 6);
+    }
+
+    @Test
+    public void shouldAllowForTopologiesToStartPaused() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.pause();
+        kafkaStreams.start();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+        assertTrue(kafkaStreams.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        assertNoLag(kafkaStreams);
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+
+        kafkaStreams.resume();
+        assertFalse(kafkaStreams.isPaused());
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+    }
+
+    @Test
+    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws 
Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), 
builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), 
State.RUNNING, STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+
+        streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        assertNoLag(streamsNamedTopologyWrapper);
+
+        awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+        assertTopicSize(OUTPUT_STREAM_2, 6);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2);
+    }
+
+    @Test
+    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() 
throws Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), 
builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), 
State.RUNNING, STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA);
+
+        streamsNamedTopologyWrapper.pause();
+        assertTrue(streamsNamedTopologyWrapper.isPaused());
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        assertNoLag(streamsNamedTopologyWrapper);
+
+        waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 6);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+    }
+
+    @Test
+    public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), 
builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), 
State.RUNNING, STARTUP_TIMEOUT);
+
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+
+        streamsNamedTopologyWrapper.pause();
+        assertTrue(streamsNamedTopologyWrapper.isPaused());
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        assertNoLag(streamsNamedTopologyWrapper);
+
+        waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+    }
+
+    @Test
+    public void pauseResumehouldWorkAcrossInstances() throws Exception {
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.pause();
+        kafkaStreams.start();
+
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+        assertTrue(kafkaStreams.isPaused());
+
+        kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
+        kafkaStreams2.pause();
+        kafkaStreams2.start();
+        assertTrue(kafkaStreams2.isPaused());
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);
+        waitUntilStreamsHasPolled(kafkaStreams2, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+
+        kafkaStreams2.close();
+        kafkaStreams2.cleanUp();
+        waitForApplicationState(singletonList(kafkaStreams2), 
State.NOT_RUNNING, STARTUP_TIMEOUT);
+
+        kafkaStreams.resume();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+    }
+

Review Comment:
   Could you please also add a test that verifies if active tasks in 
restoration and standbys are paused? Something like you start two Streams 
clients with 1 standby. If you have one partition one client should get the 
active stateful task and the other should get the standby task. The clients 
process some data and write some data into their states. Then shutdown the 
Streams clients and wipe out the local state. Finally, start both clients 
paused and verify if the lag of the local stores stays constant and greater 
than zero for a couple of poll loop iterations.  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -479,6 +485,47 @@ public void restore(final Map<TaskId, Task> tasks) {
         }
     }
 
+    private void updateStandbyPartitions(final Map<TaskId, Task> tasks,
+        final Set<TopicPartition> restoringChangelogs) {
+        if (state == ChangelogReaderState.ACTIVE_RESTORING) {
+            updatePartitionsByType(tasks, restoringChangelogs, 
TaskType.ACTIVE);
+        }
+        if (state == ChangelogReaderState.STANDBY_UPDATING) {
+            updatePartitionsByType(tasks, restoringChangelogs, 
TaskType.STANDBY);
+        }
+    }
+
+    private void updatePartitionsByType(final Map<TaskId, Task> tasks,
+                                        final Set<TopicPartition> 
restoringChangelogs,
+                                        final TaskType taskType) {
+        final Collection<TopicPartition> toResume =
+            restoringChangelogs.stream().filter(t -> shouldResume(tasks, t, 
taskType)).collect(Collectors.toList());
+        final Collection<TopicPartition> toPause =
+            restoringChangelogs.stream().filter(t -> shouldPause(tasks, t, 
taskType)).collect(Collectors.toList());
+        restoreConsumer.resume(toResume);
+        restoreConsumer.pause(toPause);
+    }
+
+    private boolean shouldResume(final Map<TaskId, Task> tasks, final 
TopicPartition partition, final TaskType taskType) {
+        final ProcessorStateManager manager = 
changelogs.get(partition).stateManager;
+        final TaskId taskId = manager.taskId();
+        final Task task = tasks.get(taskId);
+        if (manager.taskType() == taskType) {
+            return task != null;
+        }
+        return false;
+    }
+
+    private boolean shouldPause(final Map<TaskId, Task> tasks, final 
TopicPartition partition, final TaskType taskType) {
+        final ProcessorStateManager manager = 
changelogs.get(partition).stateManager;
+        final TaskId taskId = manager.taskId();
+        final Task task = tasks.get(taskId);
+        if (manager.taskType() == taskType) {
+            return task == null;
+        }
+        return false;
+    }

Review Comment:
   Why not check the task type earlier? If the task type does not match, you do 
not need to do anything else. 
   ```suggestion
           final ProcessorStateManager manager = 
changelogs.get(partition).stateManager;
           if (manager.taskType() == taskType) {
               final TaskId taskId = manager.taskId();
               final Task task = tasks.get(taskId);
               return task == null;
           }
           return false;
       }
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1686,8 +1685,74 @@ public void shouldUpdateStandbyTask() throws Exception {
         final String storeName2 = "table-two";
         final String changelogName1 = APPLICATION_ID + "-" + storeName1 + 
"-changelog";
         final String changelogName2 = APPLICATION_ID + "-" + storeName2 + 
"-changelog";
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
+
+        setupThread(storeName1, storeName2, changelogName1, changelogName2, 
thread, restoreConsumer, false);
+
+        thread.runOnce();
+
+        final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), 
t1p1);
+        final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), 
t2p1);
+        assertEquals(task1, standbyTask1.id());
+        assertEquals(task3, standbyTask2.id());
+
+        final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, 
Long>) standbyTask1.getStore(storeName1);
+        final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, 
Long>) standbyTask2.getStore(storeName2);
+
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        addStandbyRecordsToRestoreConsumer(restoreConsumer);
+
+        thread.runOnce();
+
+        assertEquals(10L, store1.approximateNumEntries());
+        assertEquals(4L, store2.approximateNumEntries());
+
+        thread.taskManager().shutdown(true);
+    }
+
+    private void addActiveRecordsToRestoreConsumer(final MockConsumer<byte[], 
byte[]> restoreConsumer) {
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                "stream-thread-test-count-one-changelog",
+                2,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+        }
+    }
+
+    private void addStandbyRecordsToRestoreConsumer(final MockConsumer<byte[], 
byte[]> restoreConsumer) {
+        // let the store1 be restored from 0 to 10; store2 be restored from 5 
(checkpointed) to 10
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                "stream-thread-test-count-one-changelog",
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                "stream-thread-test-table-two-changelog",
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+        }
+    }
+
+    private void setupThread(final String storeName1,
+        final String storeName2,
+        final String changelogName1,
+        final String changelogName2,
+        final StreamThread thread,
+        final MockConsumer<byte[], byte[]> restoreConsumer,
+        final boolean addActiveTask) throws IOException {
+        final TopicPartition activePartition = new 
TopicPartition(changelogName1, 2);
         final TopicPartition partition1 = new TopicPartition(changelogName1, 
1);
         final TopicPartition partition2 = new TopicPartition(changelogName2, 
1);

Review Comment:
   ```suggestion
       private void setupThread(final String storeName1,
                                final String storeName2,
                                final String changelogName1,
                                final String changelogName2,
                                final StreamThread thread,
                                final MockConsumer<byte[], byte[]> 
restoreConsumer,
                                final boolean addActiveTask) throws IOException 
{
           final TopicPartition activePartition = new 
TopicPartition(changelogName1, 2);
           final TopicPartition partition1 = new TopicPartition(changelogName1, 
1);
           final TopicPartition partition2 = new TopicPartition(changelogName2, 
1);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -479,6 +485,47 @@ public void restore(final Map<TaskId, Task> tasks) {
         }
     }
 
+    private void updateStandbyPartitions(final Map<TaskId, Task> tasks,

Review Comment:
   After your changes that include the restoration of active tasks the name of 
the method does not reflect what the method does anymore.  Please rename the 
method to something more appropriate.



##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.LagInfo;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class PauseResumeIntegrationTest {
+    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+    private static Properties producerConfig;
+    private static Properties consumerConfig;
+
+    private static final Materialized<Object, Long, KeyValueStore<Bytes, 
byte[]>> IN_MEMORY_STORE =
+        Materialized.as(Stores.inMemoryKeyValueStore("store"));
+
+    private static final String INPUT_STREAM_1 = "input-stream-1";
+    private static final String INPUT_STREAM_2 = "input-stream-2";
+    private static final String OUTPUT_STREAM_1 = "output-stream-1";
+    private static final String OUTPUT_STREAM_2 = "output-stream-2";
+    private static final String TOPOLOGY1 = "topology1";
+    private static final String TOPOLOGY2 = "topology2";
+
+    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
+        asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
+        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 =
+        asList(pair("B", 2L), pair("A", 4L), pair("C", 4L));
+
+    private String appId;
+    private KafkaStreams kafkaStreams, kafkaStreams2;
+    private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    @BeforeClass
+    public static void startCluster() throws Exception {
+        CLUSTER.start();
+        producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+            StringSerializer.class, LongSerializer.class);
+        consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
+            StringDeserializer.class, LongDeserializer.class);
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void createTopics() throws InterruptedException {
+        cleanStateBeforeTest(CLUSTER, 2, INPUT_STREAM_1, INPUT_STREAM_2, 
OUTPUT_STREAM_1, OUTPUT_STREAM_2);
+        appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName);
+    }
+
+    private Properties props() {
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        return properties;
+    }
+
+    @After
+    public void shutdown() throws InterruptedException {
+        for (final KafkaStreams streams : Arrays.asList(kafkaStreams, 
kafkaStreams2, streamsNamedTopologyWrapper)) {
+            if (streams != null) {
+                streams.close(Duration.ofSeconds(30));
+            }
+        }
+    }
+
+    private static void produceToInputTopics(final String topic, final 
Collection<KeyValue<String, Long>> records) {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, 
producerConfig, CLUSTER.time);
+    }
+
+    @Test
+    public void shouldPauseAndResumeKafkaStreams() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.start();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+
+        kafkaStreams.pause();
+        assertTrue(kafkaStreams.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        assertNoLag(kafkaStreams);

Review Comment:
   I just realized that this method computes the lag of the store regarding the 
changelog partition and not of the input partitions. Was this intentional? I 
think the lag of the store regarding the changelog topic will always be zero in 
this case because in normal mode the Streams client writes to the changelog 
topic and so it is always up-to-date.
   I thought that this method verified that the main consumer does not have any 
lag regarding the input partitions which would tell us that the main consumer 
polled data although it was paused which is the expected behavior. This method 
with `waitUntilStreamsHasPolled(kafkaStreams, 2);` and 
`assertTopicSize(OUTPUT_STREAM_1, <same size as before the call to pause>);` 
would tell us that although the Streams clients polled data and went through 
the poll loop, no data was produced to the output topic. For that you should 
also set the cache size to 0 with `STATESTORE_CACHE_MAX_BYTES_CONFIG`.   



##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.LagInfo;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class PauseResumeIntegrationTest {
+    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+    private static Properties producerConfig;
+    private static Properties consumerConfig;
+
+    private static final Materialized<Object, Long, KeyValueStore<Bytes, 
byte[]>> IN_MEMORY_STORE =
+        Materialized.as(Stores.inMemoryKeyValueStore("store"));
+
+    private static final String INPUT_STREAM_1 = "input-stream-1";
+    private static final String INPUT_STREAM_2 = "input-stream-2";
+    private static final String OUTPUT_STREAM_1 = "output-stream-1";
+    private static final String OUTPUT_STREAM_2 = "output-stream-2";
+    private static final String TOPOLOGY1 = "topology1";
+    private static final String TOPOLOGY2 = "topology2";
+
+    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
+        asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
+        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L));
+    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA2 =
+        asList(pair("B", 2L), pair("A", 4L), pair("C", 4L));
+
+    private String appId;
+    private KafkaStreams kafkaStreams, kafkaStreams2;
+    private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper;
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    @BeforeClass
+    public static void startCluster() throws Exception {
+        CLUSTER.start();
+        producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+            StringSerializer.class, LongSerializer.class);
+        consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
+            StringDeserializer.class, LongDeserializer.class);
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void createTopics() throws InterruptedException {
+        cleanStateBeforeTest(CLUSTER, 2, INPUT_STREAM_1, INPUT_STREAM_2, 
OUTPUT_STREAM_1, OUTPUT_STREAM_2);
+        appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName);
+    }
+
+    private Properties props() {
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
+        return properties;
+    }
+
+    @After
+    public void shutdown() throws InterruptedException {
+        for (final KafkaStreams streams : Arrays.asList(kafkaStreams, 
kafkaStreams2, streamsNamedTopologyWrapper)) {
+            if (streams != null) {
+                streams.close(Duration.ofSeconds(30));
+            }
+        }
+    }
+
+    private static void produceToInputTopics(final String topic, final 
Collection<KeyValue<String, Long>> records) {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, 
producerConfig, CLUSTER.time);
+    }
+
+    @Test
+    public void shouldPauseAndResumeKafkaStreams() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.start();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+
+        kafkaStreams.pause();
+        assertTrue(kafkaStreams.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        assertNoLag(kafkaStreams);
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+
+        kafkaStreams.resume();
+        assertFalse(kafkaStreams.isPaused());
+
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 6);
+    }
+
+    @Test
+    public void shouldAllowForTopologiesToStartPaused() throws Exception {
+        kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+        kafkaStreams.pause();
+        kafkaStreams.start();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+        assertTrue(kafkaStreams.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        assertNoLag(kafkaStreams);
+
+        waitUntilStreamsHasPolled(kafkaStreams, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+
+        kafkaStreams.resume();
+        assertFalse(kafkaStreams.isPaused());
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+    }
+
+    @Test
+    public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws 
Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), 
builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), 
State.RUNNING, STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+
+        streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        assertNoLag(streamsNamedTopologyWrapper);
+
+        awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+        assertTopicSize(OUTPUT_STREAM_2, 6);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2);
+    }
+
+    @Test
+    public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() 
throws Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), 
builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), 
State.RUNNING, STARTUP_TIMEOUT);
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+        awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA);
+
+        streamsNamedTopologyWrapper.pause();
+        assertTrue(streamsNamedTopologyWrapper.isPaused());
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        assertNoLag(streamsNamedTopologyWrapper);
+
+        waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 6);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+    }
+
+    @Test
+    public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
+        streamsNamedTopologyWrapper = new 
KafkaStreamsNamedTopologyWrapper(props());
+        final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
+        final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();
+
+        streamsNamedTopologyWrapper.pauseNamedTopology(TOPOLOGY1);
+        streamsNamedTopologyWrapper.start(asList(builder1.build(), 
builder2.build()));
+        waitForApplicationState(singletonList(streamsNamedTopologyWrapper), 
State.RUNNING, STARTUP_TIMEOUT);
+
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        awaitOutput(OUTPUT_STREAM_2, 3, COUNT_OUTPUT_DATA);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+
+        streamsNamedTopologyWrapper.pause();
+        assertTrue(streamsNamedTopologyWrapper.isPaused());
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
+
+        assertNoLag(streamsNamedTopologyWrapper);
+
+        waitUntilStreamsHasPolled(streamsNamedTopologyWrapper, 2);
+        assertTopicSize(OUTPUT_STREAM_1, 0);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+
+        streamsNamedTopologyWrapper.resumeNamedTopology(TOPOLOGY1);
+        assertFalse(streamsNamedTopologyWrapper.isPaused());
+        
assertFalse(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY1));
+        
assertTrue(streamsNamedTopologyWrapper.isNamedTopologyPaused(TOPOLOGY2));
+
+        awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA2);
+        assertTopicSize(OUTPUT_STREAM_1, 3);
+        assertTopicSize(OUTPUT_STREAM_2, 3);
+    }
+
+    @Test
+    public void pauseResumehouldWorkAcrossInstances() throws Exception {

Review Comment:
   Typo: ` pauseResumehouldWorkAcrossInstances` -> ` 
pauseResumeShouldWorkAcrossInstances`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to