bvarghese1 commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r875206101
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1662,6 +1663,51 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) { return queryableStoreProvider.getStore(storeQueryParameters); } + /** + * This method pauses processing for the KafkaStreams instance. + * + * Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. + * Notably, paused topologies will still poll Kafka consumers, and commit offsets. + * This method sets transient state that is not maintained or managed among instances. + * Note that pause() can be called before start() in order to start a KafkaStreams instance + * in a manner where the processing is paused as described, but the consumers are started up. + */ + public void pause() { + if (topologyMetadata.hasNamedTopologies()) { + for (final NamedTopology allNamedTopology : topologyMetadata.getAllNamedTopologies()) { + topologyMetadata.pauseTopology(allNamedTopology.name()); + } + } else { + topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY); + } + } + + /** + * @return true when the KafkaStreams instance has its processing paused. + */ + public boolean isPaused() { + if (topologyMetadata.hasNamedTopologies()) { + return topologyMetadata.getAllNamedTopologies() + .stream().map(NamedTopology::name).allMatch(topologyMetadata::isPaused); Review Comment: Minor: Split across multiple lines for better readability. Eg: https://github.com/apache/kafka/pull/12161/files#diff-b7905c38e362cc37f6989a70b9d0cb028086fbfc79431ddbb9c64870e95482bcL230 ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ########## @@ -273,6 +274,12 @@ Collection<Task> allTasks() { return readOnlyTasks; } + Collection<Task> notPausedTasks() { + return new ArrayList<>(readOnlyActiveTasks).stream().filter(t -> Review Comment: Minor: Split across multiple lines for better readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org