[ 
https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584145#comment-16584145
 ] 

ASF GitHub Bot commented on KAFKA-6998:
---------------------------------------

guozhangwang closed pull request #5488: KAFKA-6998: Disable Caching when 
max.cache.bytes are zero.
URL: https://github.com/apache/kafka/pull/5488
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9071c9db2bf..33e037c3ced 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1749,7 +1749,7 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
             if (!parts.isEmpty())
                 return parts;
 
-            Timer timer = time.timer(requestTimeoutMs);
+            Timer timer = time.timer(timeout);
             Map<String, List<PartitionInfo>> topicMetadata = 
fetcher.getTopicMetadata(
                     new 
MetadataRequest.Builder(Collections.singletonList(topic), true), timer);
             return topicMetadata.get(topic);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2cefb35905c..82323d9081d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -635,10 +635,6 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         this.config = config;
         this.time = time;
 
-        // adjust the topology if optimization is turned on.
-        // TODO: to be removed post 2.0
-        internalTopologyBuilder.adjust(config);
-
         // The application ID is a required config and hence should always 
have value
         processId = UUID.randomUUID();
         final String userClientId = 
config.getString(StreamsConfig.CLIENT_ID_CONFIG);
@@ -667,7 +663,8 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         reporters.add(new JmxReporter(JMX_PREFIX));
         metrics = new Metrics(metricConfig, reporters, time);
 
-        internalTopologyBuilder.setApplicationId(applicationId);
+        // re-write the physical topology according to the config
+        internalTopologyBuilder.rewriteTopology(config);
 
         // sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
         internalTopologyBuilder.build();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 99d616f60e4..edff470a41e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -66,7 +66,10 @@
     // state factories
     private final Map<String, StateStoreFactory> stateFactories = new 
HashMap<>();
 
-    // global state factories
+    // built global state stores
+    private final Map<String, StoreBuilder> globalStateBuilders = new 
LinkedHashMap<>();
+
+    // built global state stores
     private final Map<String, StateStore> globalStateStores = new 
LinkedHashMap<>();
 
     // all topics subscribed from source processors (without application-id 
prefix for internal topics)
@@ -326,6 +329,7 @@ Sink describe() {
         }
     }
 
+    // public for testing only
     public synchronized final InternalTopologyBuilder setApplicationId(final 
String applicationId) {
         Objects.requireNonNull(applicationId, "applicationId can't be null");
         this.applicationId = applicationId;
@@ -333,6 +337,35 @@ public synchronized final InternalTopologyBuilder 
setApplicationId(final String
         return this;
     }
 
+    public synchronized final InternalTopologyBuilder rewriteTopology(final 
StreamsConfig config) {
+        Objects.requireNonNull(config, "config can't be null");
+
+        // set application id
+        
setApplicationId(config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
+
+        // maybe strip out caching layers
+        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) == 
0L) {
+            for (final StateStoreFactory storeFactory : 
stateFactories.values()) {
+                storeFactory.builder.withCachingDisabled();
+            }
+
+            for (final StoreBuilder storeBuilder : 
globalStateBuilders.values()) {
+                storeBuilder.withCachingDisabled();
+            }
+        }
+
+        // build global state stores
+        for (final StoreBuilder storeBuilder : globalStateBuilders.values()) {
+            globalStateStores.put(storeBuilder.name(), storeBuilder.build());
+        }
+
+        // adjust the topology if optimization is turned on.
+        // TODO: to be removed post 2.0
+        adjust(config);
+
+        return this;
+    }
+
     public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
@@ -524,7 +557,7 @@ public final void addGlobalStore(final 
StoreBuilder<KeyValueStore> storeBuilder,
                        processorName,
                        stateUpdateSupplier,
                        storeBuilder.name(),
-                       storeBuilder.build());
+                       storeBuilder);
     }
 
     private void validateTopicNotAlreadyRegistered(final String topic) {
@@ -552,8 +585,8 @@ public final void connectProcessorAndStateStores(final 
String processorName,
         }
     }
 
-    public final void connectSourceStoreAndTopic(final String sourceStoreName,
-                                                 final String topic) {
+    private void connectSourceStoreAndTopic(final String sourceStoreName,
+                                            final String topic) {
         if (storeToChangelogTopic.containsKey(sourceStoreName)) {
             throw new TopologyException("Source store " + sourceStoreName + " 
is already added.");
         }
@@ -593,7 +626,7 @@ private void validateGlobalStoreArguments(final String 
sourceName,
         if (nodeFactories.containsKey(processorName)) {
             throw new TopologyException("Processor " + processorName + " is 
already added.");
         }
-        if (stateFactories.containsKey(storeName) || 
globalStateStores.containsKey(storeName)) {
+        if (stateFactories.containsKey(storeName) || 
globalStateBuilders.containsKey(storeName)) {
             throw new TopologyException("StateStore " + storeName + " is 
already added.");
         }
         if (loggingEnabled) {
@@ -612,7 +645,7 @@ private void addGlobalStore(final String sourceName,
                                 final String processorName,
                                 final ProcessorSupplier stateUpdateSupplier,
                                 final String name,
-                                final KeyValueStore store) {
+                                final StoreBuilder<KeyValueStore> 
storeBuilder) {
         final String[] topics = {topic};
         final String[] predecessors = {sourceName};
         final ProcessorNodeFactory nodeFactory = new 
ProcessorNodeFactory(processorName,
@@ -631,13 +664,13 @@ private void addGlobalStore(final String sourceName,
         nodeFactories.put(processorName, nodeFactory);
         nodeGrouper.add(processorName);
         nodeGrouper.unite(processorName, predecessors);
-        globalStateStores.put(name, store);
+        globalStateBuilders.put(name, storeBuilder);
         connectSourceStoreAndTopic(name, topic);
     }
 
     private void connectProcessorAndStateStore(final String processorName,
                                                final String stateStoreName) {
-        if (globalStateStores.containsKey(stateStoreName)) {
+        if (globalStateBuilders.containsKey(stateStoreName)) {
             throw new TopologyException("Global StateStore " + stateStoreName +
                     " can be used by a Processor without being specified; it 
should not be explicitly passed.");
         }
@@ -804,6 +837,8 @@ public synchronized ProcessorTopology build(final Integer 
topicGroupId) {
      * @return ProcessorTopology
      */
     public synchronized ProcessorTopology buildGlobalStateTopology() {
+        Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
+
         final Set<String> globalGroups = globalNodeGroups();
         if (globalGroups.isEmpty()) {
             return null;
@@ -825,6 +860,8 @@ public synchronized ProcessorTopology 
buildGlobalStateTopology() {
     }
 
     private ProcessorTopology build(final Set<String> nodeGroup) {
+        Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
+
         final Map<String, ProcessorNode> processorMap = new LinkedHashMap<>();
         final Map<String, SourceNode> topicSourceMap = new HashMap<>();
         final Map<String, SinkNode> topicSinkMap = new HashMap<>();
@@ -950,10 +987,14 @@ private void buildProcessorNode(final Map<String, 
ProcessorNode> processorMap,
      * @return map containing all global {@link StateStore}s
      */
     public Map<String, StateStore> globalStateStores() {
+        Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
+
         return Collections.unmodifiableMap(globalStateStores);
     }
 
     public Set<String> allStateStoreName() {
+        Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
+
         final Set<String> allNames = new HashSet<>(stateFactories.keySet());
         allNames.addAll(globalStateStores.keySet());
         return Collections.unmodifiableSet(allNames);
@@ -1036,7 +1077,7 @@ private void buildProcessorNode(final Map<String, 
ProcessorNode> processorMap,
 
     // Adjust the generated topology based on the configs.
     // Not exposed as public API and should be removed post 2.0
-    public void adjust(final StreamsConfig config) {
+    private void adjust(final StreamsConfig config) {
         final boolean enableOptimization20 = 
config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
 
         if (enableOptimization20) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
index 2d1b241e820..a930468d0f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
@@ -32,6 +32,12 @@
      */
     StoreBuilder<T> withCachingEnabled();
 
+    /**
+     * Disable caching on the store.
+     * @return  this
+     */
+    StoreBuilder<T> withCachingDisabled();
+
     /**
      * Maintain a changelog for any changes made to the store.
      * Use the provided config to set the config of the changelog topic.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
index fdcd2e72d64..898db9e482c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
@@ -52,6 +52,12 @@ public AbstractStoreBuilder(final String name,
         return this;
     }
 
+    @Override
+    public StoreBuilder<T> withCachingDisabled() {
+        enableCaching = false;
+        return this;
+    }
+
     @Override
     public StoreBuilder<T> withLoggingEnabled(final Map<String, String> 
config) {
         Objects.requireNonNull(config, "config can't be null");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 28e3d552c26..481c8602c86 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -81,11 +81,12 @@
     @Before
     public void before() {
         props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
+        props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
+        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streams = new KafkaStreams(builder.build(), props);
     }
 
@@ -238,10 +239,10 @@ public boolean conditionMet() {
     @Test
     public void 
globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
         final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:1");
-        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
+        props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
+        props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
         props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 200);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 56e6a6dc1c9..4fab4ff87f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -53,7 +53,7 @@
 public class StreamsBuilderTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
@@ -61,7 +61,7 @@ public void shouldAllowJoinUnmaterializedFilteredKTable() {
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, 
MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), 
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -75,7 +75,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, 
MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), 
equalTo(Collections.singleton("store")));
@@ -88,7 +88,7 @@ public void shouldAllowJoinUnmaterializedMapValuedKTable() {
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, 
MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), 
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -102,7 +102,7 @@ public void shouldAllowJoinMaterializedMapValuedKTable() {
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, 
MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), 
equalTo(Collections.singleton("store")));
@@ -116,7 +116,7 @@ public void shouldAllowJoinUnmaterializedJoinedKTable() {
         builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, 
MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), 
equalTo(Utils.mkSet(topology.stateStores().get(0).name(), 
topology.stateStores().get(1).name())));
@@ -130,7 +130,7 @@ public void shouldAllowJoinMaterializedJoinedKTable() {
         builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, 
MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), 
MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(3));
         
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), 
equalTo(Collections.singleton("store")));
@@ -143,7 +143,7 @@ public void shouldAllowJoinMaterializedSourceKTable() {
         builder.<Bytes, String>stream("stream-topic").join(table, 
MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         
assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), 
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -284,11 +284,11 @@ public void 
shouldReuseSourceTopicAsChangelogsWithOptimization20() {
         final String topic = "topic";
         builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("store"));
         final Topology topology = builder.build();
-        final Properties props = StreamsTestUtils.minimalStreamsConfig();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
 
         final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
-        internalTopologyBuilder.adjust(new StreamsConfig(props));
+        internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
 
         assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), 
equalTo(Collections.singletonMap("store", "topic")));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 1791672ec4c..83279cc4ddc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -49,7 +49,7 @@
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
-import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
+import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -523,7 +523,7 @@ public void 
shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() {
 
     @Test
     public void shouldUseNewConfigsWhenPresent() {
-        final Properties props = minimalStreamsConfig();
+        final Properties props = getStreamsConfig();
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class);
@@ -536,7 +536,7 @@ public void shouldUseNewConfigsWhenPresent() {
 
     @Test
     public void shouldUseCorrectDefaultsWhenNoneSpecified() {
-        final StreamsConfig config = new StreamsConfig(minimalStreamsConfig());
+        final StreamsConfig config = new StreamsConfig(getStreamsConfig());
         assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde);
         assertTrue(config.defaultValueSerde() instanceof 
Serdes.ByteArraySerde);
         assertTrue(config.defaultTimestampExtractor() instanceof 
FailOnInvalidTimestamp);
@@ -544,7 +544,7 @@ public void shouldUseCorrectDefaultsWhenNoneSpecified() {
 
     @Test
     public void shouldSpecifyCorrectKeySerdeClassOnError() {
-        final Properties props = minimalStreamsConfig();
+        final Properties props = getStreamsConfig();
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
MisconfiguredSerde.class);
         final StreamsConfig config = new StreamsConfig(props);
         try {
@@ -558,7 +558,7 @@ public void shouldSpecifyCorrectKeySerdeClassOnError() {
     @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectValueSerdeClassOnError() {
-        final Properties props = minimalStreamsConfig();
+        final Properties props = getStreamsConfig();
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
MisconfiguredSerde.class);
         final StreamsConfig config = new StreamsConfig(props);
         try {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
index 940ec0ec70b..e1c7c1170ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.test.StreamsTestUtils;
 
 /**
  *  This class allows to access the {@link InternalTopologyBuilder} a {@link 
Topology} object.
@@ -25,14 +26,14 @@
 public class TopologyWrapper extends Topology {
 
     static public InternalTopologyBuilder getInternalTopologyBuilder(final 
Topology topology) {
-        return topology.internalTopologyBuilder;
+        return topology.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig()));
     }
 
     public InternalTopologyBuilder getInternalBuilder() {
-        return internalTopologyBuilder;
+        return internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig()));
     }
 
-    public void setApplicationId(final String applicationId) {
-        internalTopologyBuilder.setApplicationId(applicationId);
+    public InternalTopologyBuilder getInternalBuilder(final String 
applicationId) {
+        return internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(applicationId)));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 770f579ad98..9aa07729c20 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -163,8 +163,13 @@ private void runSimpleCopyTest(final int numberOfRestarts,
                     Serdes.LongSerde.class.getName(),
                     new Properties() {
                         {
-                            
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
                             put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
+                            
put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+                            put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+                            
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
+                            
put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 
"1000");
+                            
put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
"earliest");
+                            
put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
                         }
                     }));
 
@@ -248,6 +253,10 @@ public void shouldBeAbleToPerformMultipleTransactions() 
throws Exception {
                 new Properties() {
                     {
                         put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
+                        put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+                        put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+                        put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+                        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
                     }
                 }));
 
@@ -669,6 +678,8 @@ public void close() { }
                         put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
                         put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numberOfStreamsThreads);
                         put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, -1);
+                        
put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 
"1000");
+                        
put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
"earliest");
                         
put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 
1000);
                         
put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 
1000 - 1);
                         
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), 
MAX_POLL_INTERVAL_MS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 08ab1200a94..ac5a41837fa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -140,6 +140,9 @@ public static void startKafkaCluster() throws 
InterruptedException {
     public void setUp() throws IOException {
 
         final Properties props = new Properties();
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
@@ -277,6 +280,9 @@ public void shouldThrowExceptionOverlappingTopic() {
     @Test
     public void shouldThrowStreamsExceptionNoResetSpecified() throws 
InterruptedException {
         final Properties props = new Properties();
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
 
         final Properties localConfig = StreamsTestUtils.getStreamsConfig(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index fe7ee266657..153c5a1c508 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -35,9 +36,7 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
@@ -89,10 +88,10 @@ public void before() {
         streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
         streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
         streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
         streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
     }
 
     @After
@@ -147,14 +146,9 @@ public void 
shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> textLines = 
builder.stream(DEFAULT_INPUT_TOPIC);
 
-        textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
-            @Override
-            public Iterable<String> apply(final String value) {
-                return 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-            }
-        })
-                .groupBy(MockMapper.<String, String>selectValueMapper())
-                .count(Materialized.<String, Long, 
KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("Counts"));
+        textLines.flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+            .groupBy(MockMapper.selectValueMapper())
+            .count(Materialized.as("Counts"));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsProp);
         streams.start();
@@ -191,19 +185,10 @@ public void 
shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Except
 
         final int durationMs = 2000;
 
-        textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() {
-            @Override
-            public Iterable<String> apply(final String value) {
-                return 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-            }
-        })
-            .groupBy(MockMapper.<String, String>selectValueMapper())
+        textLines.flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+            .groupBy(MockMapper.selectValueMapper())
             .windowedBy(TimeWindows.of(1000).grace(0L))
-            .count(
-                Materialized
-                    .<String, Long, 
WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows")
-                    .withRetention(2_000L)
-            );
+            .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("CountWindows").withRetention(2_000L));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsProp);
         streams.start();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 29f077fc52c..2269a5dd88a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -151,6 +151,7 @@ public void setup() {
 
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
purgeIntervalMs);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
@@ -158,7 +159,7 @@ public void setup() {
         
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG),
 purgeIntervalMs);
         
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG),
 purgeSegmentBytes);
         
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG),
 purgeSegmentBytes / 2);    // we cannot allow batch size larger than segment 
size
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
purgeIntervalMs);
+        
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
 
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index d3a7aee6523..7cfde6122c8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -18,6 +18,7 @@
 
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
@@ -29,6 +30,7 @@
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -115,7 +117,12 @@ public void setUp() throws Exception {
         CLUSTER.deleteAndRecreateTopics(DEFAULT_OUTPUT_TOPIC);
 
         final Properties properties = new Properties();
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
+
         streamsConfiguration = 
StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
                                                                  
CLUSTER.bootstrapServers(),
                                                                  
STRING_SERDE_CLASSNAME,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
index 200062e0fb9..58d903a50ea 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -165,10 +165,10 @@ private void runIntegrationTest(final String 
optimizationConfig,
         streams.start();
 
         final List<KeyValue<String, Long>> expectedCountKeyValues = 
Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), 
KeyValue.pair("C", 6L));
-        final List<KeyValue<String, Long>> receivedCountKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, 
COUNT_TOPIC, expectedCountKeyValues.size());
+        final List<KeyValue<String, Long>> receivedCountKeyValues = 
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, 
COUNT_TOPIC, expectedCountKeyValues);
 
         final List<KeyValue<String, String>> expectedStringCountKeyValues = 
Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), 
KeyValue.pair("C", "6"));
-        final List<KeyValue<String, String>> receivedCountStringKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, 
COUNT_STRING_TOPIC, expectedStringCountKeyValues.size());
+        final List<KeyValue<String, String>> receivedCountStringKeyValues = 
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, 
COUNT_STRING_TOPIC, expectedStringCountKeyValues);
 
         assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
         assertThat(receivedCountStringKeyValues, 
equalTo(expectedStringCountKeyValues));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index c1d07dcdcca..5eb4fc7e5b4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -106,6 +106,7 @@ private Properties props(final String applicationId) {
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
         return streamsConfiguration;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 6af333121d0..c43fcd80e93 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -99,7 +99,7 @@ public void shouldInnerJoinWithStream() {
 
     private void verifyJoin(final Map<String, String> expected) {
         final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-        final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             // write some data to the global table
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index c6c4996846d..2bf6971ff7f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -33,6 +34,7 @@
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -60,7 +62,6 @@
     private final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new 
MaterializedInternal<>(Materialized.as("test-store"));
 
     {
-        builder.internalTopologyBuilder.setApplicationId(APP_ID);
         materialized.generateStoreNameIfNeeded(builder, storePrefix);
     }
 
@@ -132,7 +133,9 @@ public void 
shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() {
         final KTable table1 = builder.table("topic2", consumed, 
materializedInternal);
 
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.build(null);
+        final ProcessorTopology topology = builder.internalTopologyBuilder
+            .rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .build(null);
 
         assertEquals(1, topology.stateStores().size());
         final String storeName = "prefix-STATE-STORE-0000000000";
@@ -174,7 +177,9 @@ public void shouldBuildSimpleGlobalTableTopology() {
             materializedInternal);
 
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.buildGlobalStateTopology();
+        final ProcessorTopology topology = builder.internalTopologyBuilder
+            .rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
 
         assertEquals(1, stateStores.size());
@@ -182,7 +187,9 @@ public void shouldBuildSimpleGlobalTableTopology() {
     }
 
     private void doBuildGlobalTopologyWithAllGlobalTables() {
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.buildGlobalStateTopology();
+        final ProcessorTopology topology = builder.internalTopologyBuilder
+            .rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .buildGlobalStateTopology();
 
         final List<StateStore> stateStores = topology.globalStateStores();
         final Set<String> sourceTopics = topology.sourceTopics();
@@ -263,8 +270,9 @@ public void shouldMapStateStoresToCorrectSourceTopics() {
 
 
         final KStream<String, String> mapped = 
playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
-        mapped.leftJoin(table, 
MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.<String, Long, 
KeyValueStore<Bytes, byte[]>>as("count"));
+        mapped.leftJoin(table, 
MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.as("count"));
         builder.buildAndOptimizeTopology();
+        builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)));
         assertEquals(Collections.singletonList("table-topic"), 
builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singletonList(APP_ID + 
"-KSTREAM-MAP-0000000003-repartition"), 
builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count"));
     }
@@ -359,6 +367,7 @@ public void 
shouldAddRegexTopicToLatestAutoOffsetResetList() {
     public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.stream(Collections.singleton("topic"), consumed);
         builder.buildAndOptimizeTopology();
+        builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)));
         final ProcessorTopology processorTopology = 
builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
@@ -368,7 +377,9 @@ public void shouldUseProvidedTimestampExtractor() {
         final ConsumedInternal consumed = new 
ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
         builder.stream(Collections.singleton("topic"), consumed);
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology processorTopology = 
builder.internalTopologyBuilder.build(null);
+        final ProcessorTopology processorTopology = 
builder.internalTopologyBuilder
+            .rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
@@ -376,7 +387,9 @@ public void shouldUseProvidedTimestampExtractor() {
     public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.table("topic", consumed, materialized);
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology processorTopology = 
builder.internalTopologyBuilder.build(null);
+        final ProcessorTopology processorTopology = 
builder.internalTopologyBuilder
+            .rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
@@ -385,7 +398,9 @@ public void ktableShouldUseProvidedTimestampExtractor() {
         final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor()));
         builder.table("topic", consumed, materialized);
         builder.buildAndOptimizeTopology();
-        final ProcessorTopology processorTopology = 
builder.internalTopologyBuilder.build(null);
+        final ProcessorTopology processorTopology = 
builder.internalTopologyBuilder
+            .rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID)))
+            .build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 1517f0e9b6b..a1f8b27f4d6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -75,7 +75,7 @@
     private KGroupedStream<String, String> groupedStream;
 
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 31863f28e89..662ede7d330 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -56,7 +56,7 @@
     private final StreamsBuilder builder = new StreamsBuilder();
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private KGroupedTable<String, String> groupedTable;
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
     private final String topic = "input";
 
     @Before
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index b7035aa16c5..b9773469350 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -39,7 +39,7 @@
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @SuppressWarnings("unchecked")
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 6306d2e0cd7..d7e62406e6d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -37,7 +37,7 @@
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     private final Predicate<Integer, String> isMultipleOfThree = new 
Predicate<Integer, String>() {
         @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 8daad991a68..7feb18ffb23 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -39,7 +39,7 @@
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMap() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index aca091136ff..0c4495e0a1a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -38,7 +38,7 @@
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMapValues() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index fbcd6db691c..feed2fbaa65 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -41,7 +41,7 @@
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testForeach() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index d5c5a54c0b4..a68583ec045 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -74,7 +74,7 @@ public String apply(final Integer key, final String value) {
         };
         stream.join(table, keyMapper, 
MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 248c3eed138..aebc2a53cd6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -76,7 +76,7 @@ public String apply(final Integer key, final String value) {
         };
         stream.leftJoin(table, keyMapper, 
MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 354fa0a147d..bce7fc80a40 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -74,7 +74,7 @@
     private StreamsBuilder builder;
 
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 963b681472b..971ee62d284 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -53,7 +53,7 @@
 
     private final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 338b96aa98a..856de3d85c8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -48,7 +48,7 @@
 
     private final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testLeftJoin() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 6ffce04c837..eb6536d37a6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -72,7 +72,7 @@ public void setUp() {
         table = builder.table(tableTopic, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         processor = supplier.theCapturedProcessor();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 1c3e027ff9f..8c57038b042 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -68,7 +68,7 @@ public void setUp() {
         table = builder.table(tableTopic, consumed);
         stream.leftJoin(table, 
MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         processor = supplier.theCapturedProcessor();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index e1851c1c04a..2692c17f021 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -38,7 +38,7 @@
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testMap() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 8de8a81a435..67891be18a5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -39,7 +39,7 @@
     private String topicName = "topic";
     private final MockProcessorSupplier<Integer, Integer> supplier = new 
MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMapValues() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 3e4012e77af..780bcae21d1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -40,7 +40,7 @@
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void shouldObserveStreamElements() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 62f767746ea..5f804d45cf4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -41,7 +41,7 @@
     private String topicName = "topic_key_select";
 
     private final ConsumerRecordFactory<String, Integer> recordFactory = new 
ConsumerRecordFactory<>(topicName, new StringSerializer(), new 
IntegerSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
 
     @Test
     public void testSelectKey() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 74cd7bdb4eb..c7fd7cdc32a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -64,24 +64,9 @@
     private static final long GAP_MS = 5 * 60 * 1000L;
     private static final String STORE_NAME = "session-store";
 
-    private final Initializer<Long> initializer = new Initializer<Long>() {
-        @Override
-        public Long apply() {
-            return 0L;
-        }
-    };
-    private final Aggregator<String, String, Long> aggregator = new 
Aggregator<String, String, Long>() {
-        @Override
-        public Long apply(final String aggKey, final String value, final Long 
aggregate) {
-            return aggregate + 1;
-        }
-    };
-    private final Merger<String, Long> sessionMerger = new Merger<String, 
Long>() {
-        @Override
-        public Long apply(final String aggKey, final Long aggOne, final Long 
aggTwo) {
-            return aggOne + aggTwo;
-        }
-    };
+    private final Initializer<Long> initializer = () -> 0L;
+    private final Aggregator<String, String, Long> aggregator = (aggKey, 
value, aggregate) -> aggregate + 1;
+    private final Merger<String, Long> sessionMerger = (aggKey, aggOne, 
aggTwo) -> aggOne + aggTwo;
     private final KStreamSessionWindowAggregate<String, String, Long> 
sessionAggregator =
         new KStreamSessionWindowAggregate<>(
             SessionWindows.with(GAP_MS),
@@ -96,7 +81,6 @@ public Long apply(final String aggKey, final Long aggOne, 
final Long aggTwo) {
     private InternalMockProcessorContext context;
     private Metrics metrics;
 
-
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
@@ -107,7 +91,7 @@ public void initializeStore() {
             Serdes.String(),
             Serdes.String(),
             metrics,
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             NoOpRecordCollector::new,
             new ThreadCache(new LogContext("testCache "), 100000, metrics)
         ) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 9a61874abbe..a8ee681cb8c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -44,7 +44,7 @@
     private String topicName = "topic";
 
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
 
     @Rule
     public final KStreamTestDriver kstreamDriver = new KStreamTestDriver();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index c3996507055..570053c0c85 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -51,7 +51,7 @@
     private String topicName = "topic";
     private final MockProcessorSupplier<Integer, Integer> supplier = new 
MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
     @Mock(MockType.NICE)
     private ProcessorContext context;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index af7cff6d17a..5a295b8a50a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -60,7 +60,7 @@
 public class KStreamWindowAggregateTest {
 
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testAggBasic() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index bc8ca95187a..3746ae9a322 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -48,7 +48,7 @@
 
 public class KStreamWindowReduceTest {
 
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 2995b234c96..3e143f5807b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -49,7 +49,7 @@
 
     private final Consumed<String, Integer> consumed = 
Consumed.with(Serdes.String(), Serdes.Integer());
     private final ConsumerRecordFactory<String, Integer> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
 
     private void doTestKTable(final StreamsBuilder builder,
                               final KTable<String, Integer> table2,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 60d7426224f..eb586e60de1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -64,7 +64,7 @@
 
     private final Consumed<String, String> consumed = 
Consumed.with(Serdes.String(), Serdes.String());
     private final Produced<String, String> produced = 
Produced.with(Serdes.String(), Serdes.String());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
 
     private StreamsBuilder builder;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index bf5660a7f11..ce7de16501c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -40,7 +40,7 @@
 public class KTableMapKeysTest {
 
     private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testMapKeysConvertingToStream() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index ddfd5a5c941..0f56043be50 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -51,7 +51,7 @@
     private final Consumed<String, String> consumed = 
Consumed.with(Serdes.String(), Serdes.String());
     private final Produced<String, String> produced = 
Produced.with(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     private void doTestKTable(final StreamsBuilder builder, final String 
topic1, final MockProcessorSupplier<String, Integer> supplier) {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 80a60ab2f20..2055f9cf319 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -48,7 +48,7 @@
 
     private final Consumed<String, String> stringConsumed = 
Consumed.with(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testKTable() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 825edb3eb37..34a235ac482 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -55,7 +55,7 @@
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     private final Merger<String, String> sessionMerger = new Merger<String, 
String>() {
         @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 7b885b23bf2..0e541c91acc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -54,7 +54,7 @@
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private TimeWindowedKStream<String, String> windowedStream;
 
     @Before
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 54a927c8072..c4699ece515 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -37,7 +37,7 @@
 
 import java.util.Properties;
 
-import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
+import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -182,7 +182,7 @@ public void appConfigsShouldReturnUnrecognizedValues() {
     private static class TestProcessorContext extends AbstractProcessorContext 
{
         static Properties config;
         static {
-            config = minimalStreamsConfig();
+            config = getStreamsConfig();
             // Value must be a string to test className -> class conversion
             config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
RocksDBConfigSetter.class.getName());
             config.put("user.supplied.config", "user-suppplied-value");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 95c6943088c..37a6fdba453 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -104,7 +104,7 @@ public String newStoreName(final String prefix) {
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah");
         properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
         config = new StreamsConfig(properties);
-        globalStreamThread = new 
GlobalStreamThread(builder.buildGlobalStateTopology(),
+        globalStreamThread = new 
GlobalStreamThread(builder.rewriteTopology(config).buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
                                                     new StateDirectory(config, 
time),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 78c217d236e..7230e5fcbce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -32,6 +33,7 @@
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStoreBuilder;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -626,7 +628,7 @@ public void 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception
     @Test
     public void shouldAddTimestampExtractorPerSource() {
         builder.addSource(null, "source", new MockTimestampExtractor(), null, 
null, "topic");
-        final ProcessorTopology processorTopology = builder.build(null);
+        final ProcessorTopology processorTopology = 
builder.rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig())).build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
@@ -634,7 +636,7 @@ public void shouldAddTimestampExtractorPerSource() {
     public void shouldAddTimestampExtractorWithPatternPerSource() {
         final Pattern pattern = Pattern.compile("t.*");
         builder.addSource(null, "source", new MockTimestampExtractor(), null, 
null, pattern);
-        final ProcessorTopology processorTopology = builder.build(null);
+        final ProcessorTopology processorTopology = 
builder.rewriteTopology(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig())).build(null);
         
assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 3495e80eef6..587cae20179 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -102,8 +102,6 @@ public void cleanup() {
 
     @Test
     public void testTopologyMetadata() {
-        topology.setApplicationId("X");
-
         topology.addSource("source-1", "topic-1");
         topology.addSource("source-2", "topic-2", "topic-3");
         topology.addProcessor("processor-1", new MockProcessorSupplier<>(), 
"source-1");
@@ -111,7 +109,7 @@ public void testTopologyMetadata() {
         topology.addSink("sink-1", "topic-3", "processor-1");
         topology.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
-        final ProcessorTopology processorTopology = 
topology.getInternalBuilder().build();
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder("X").build();
 
         assertEquals(6, processorTopology.processors().size());
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 0cf147cec2b..5b1da16c3cb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -53,7 +53,7 @@
 
 public class StreamsMetadataStateTest {
 
-    private StreamsMetadataState discovery;
+    private StreamsMetadataState metadataState;
     private HostInfo hostOne;
     private HostInfo hostTwo;
     private HostInfo hostThree;
@@ -121,8 +121,8 @@ public Object apply(final Object value) {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), 
partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 hostOne);
-        discovery.onChange(hostToPartitions, cluster);
+        metadataState = new 
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
 hostOne);
+        metadataState.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
             public Integer partition(final String topic, final String key, 
final Object value, final int numPartitions) {
@@ -145,7 +145,7 @@ public void shouldGetAllStreamInstances() {
         final StreamsMetadata three = new StreamsMetadata(hostThree, 
Utils.mkSet(globalTable, "table-three"),
                 Collections.singleton(topic3P0));
 
-        final Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+        final Collection<StreamsMetadata> actual = 
metadataState.getAllMetadata();
         assertEquals(3, actual.size());
         assertTrue("expected " + actual + " to contain " + one, 
actual.contains(one));
         assertTrue("expected " + actual + " to contain " + two, 
actual.contains(two));
@@ -165,11 +165,11 @@ public boolean test(final Object key, final Object value) 
{
         final HostInfo hostFour = new HostInfo("host-four", 8080);
         hostToPartitions.put(hostFour, Utils.mkSet(tp5));
 
-        discovery.onChange(hostToPartitions, 
cluster.withPartitions(Collections.singletonMap(tp5, new 
PartitionInfo("topic-five", 1, null, null, null))));
+        metadataState.onChange(hostToPartitions, 
cluster.withPartitions(Collections.singletonMap(tp5, new 
PartitionInfo("topic-five", 1, null, null, null))));
 
         final StreamsMetadata expected = new StreamsMetadata(hostFour, 
Collections.singleton(globalTable),
                 Collections.singleton(tp5));
-        final Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+        final Collection<StreamsMetadata> actual = 
metadataState.getAllMetadata();
         assertTrue("expected " + actual + " to contain " + expected, 
actual.contains(expected));
     }
 
@@ -179,7 +179,7 @@ public void shouldGetInstancesForStoreName() {
                 Utils.mkSet(topic1P0, topic2P1, topic4P0));
         final StreamsMetadata two = new StreamsMetadata(hostTwo, 
Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"),
                 Utils.mkSet(topic2P0, topic1P1));
-        final Collection<StreamsMetadata> actual = 
discovery.getAllMetadataForStore("table-one");
+        final Collection<StreamsMetadata> actual = 
metadataState.getAllMetadataForStore("table-one");
         assertEquals(2, actual.size());
         assertTrue("expected " + actual + " to contain " + one, 
actual.contains(one));
         assertTrue("expected " + actual + " to contain " + two, 
actual.contains(two));
@@ -187,12 +187,12 @@ public void shouldGetInstancesForStoreName() {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() {
-        discovery.getAllMetadataForStore(null);
+        metadataState.getAllMetadataForStore(null);
     }
 
     @Test
     public void 
shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() {
-        final Collection<StreamsMetadata> actual = 
discovery.getAllMetadataForStore("not-a-store");
+        final Collection<StreamsMetadata> actual = 
metadataState.getAllMetadataForStore("not-a-store");
         assertTrue(actual.isEmpty());
     }
 
@@ -201,12 +201,12 @@ public void shouldGetInstanceWithKey() {
         final TopicPartition tp4 = new TopicPartition("topic-three", 1);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
 
-        discovery.onChange(hostToPartitions, 
cluster.withPartitions(Collections.singletonMap(tp4, new 
PartitionInfo("topic-three", 1, null, null, null))));
+        metadataState.onChange(hostToPartitions, 
cluster.withPartitions(Collections.singletonMap(tp4, new 
PartitionInfo("topic-three", 1, null, null, null))));
 
         final StreamsMetadata expected = new StreamsMetadata(hostThree, 
Utils.mkSet(globalTable, "table-three"),
                 Collections.singleton(topic3P0));
 
-        final StreamsMetadata actual = 
discovery.getMetadataWithKey("table-three",
+        final StreamsMetadata actual = 
metadataState.getMetadataWithKey("table-three",
                                                                     "the-key",
                                                                     
Serdes.String().serializer());
 
@@ -218,19 +218,19 @@ public void 
shouldGetInstanceWithKeyAndCustomPartitioner() {
         final TopicPartition tp4 = new TopicPartition("topic-three", 1);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
 
-        discovery.onChange(hostToPartitions, 
cluster.withPartitions(Collections.singletonMap(tp4, new 
PartitionInfo("topic-three", 1, null, null, null))));
+        metadataState.onChange(hostToPartitions, 
cluster.withPartitions(Collections.singletonMap(tp4, new 
PartitionInfo("topic-three", 1, null, null, null))));
 
         final StreamsMetadata expected = new StreamsMetadata(hostTwo, 
Utils.mkSet(globalTable, "table-two", "table-three", "merged-table"),
                 Utils.mkSet(topic2P0, tp4));
 
-        final StreamsMetadata actual = 
discovery.getMetadataWithKey("table-three", "the-key", partitioner);
+        final StreamsMetadata actual = 
metadataState.getMetadataWithKey("table-three", "the-key", partitioner);
         assertEquals(expected, actual);
     }
 
     @Test
     public void shouldReturnNotAvailableWhenClusterIsEmpty() {
-        discovery.onChange(Collections.<HostInfo, 
Set<TopicPartition>>emptyMap(), Cluster.empty());
-        final StreamsMetadata result = 
discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer());
+        metadataState.onChange(Collections.<HostInfo, 
Set<TopicPartition>>emptyMap(), Cluster.empty());
+        final StreamsMetadata result = 
metadataState.getMetadataWithKey("table-one", "a", 
Serdes.String().serializer());
         assertEquals(StreamsMetadata.NOT_AVAILABLE, result);
     }
 
@@ -238,12 +238,12 @@ public void shouldReturnNotAvailableWhenClusterIsEmpty() {
     public void shouldGetInstanceWithKeyWithMergedStreams() {
         final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, 
topic2P2));
-        discovery.onChange(hostToPartitions, 
cluster.withPartitions(Collections.singletonMap(topic2P2, new 
PartitionInfo("topic-two", 2, null, null, null))));
+        metadataState.onChange(hostToPartitions, 
cluster.withPartitions(Collections.singletonMap(topic2P2, new 
PartitionInfo("topic-two", 2, null, null, null))));
 
         final StreamsMetadata expected = new StreamsMetadata(hostTwo, 
Utils.mkSet("global-table", "table-two", "table-one", "merged-table"),
                 Utils.mkSet(topic2P0, topic1P1, topic2P2));
 
-        final StreamsMetadata actual = 
discovery.getMetadataWithKey("merged-table", "123", new 
StreamPartitioner<String, Object>() {
+        final StreamsMetadata actual = 
metadataState.getMetadataWithKey("merged-table", "123", new 
StreamPartitioner<String, Object>() {
             @Override
             public Integer partition(final String topic, final String key, 
final Object value, final int numPartitions) {
                 return 2;
@@ -256,7 +256,7 @@ public Integer partition(final String topic, final String 
key, final Object valu
 
     @Test
     public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() {
-        final StreamsMetadata actual = 
discovery.getMetadataWithKey("not-a-store",
+        final StreamsMetadata actual = 
metadataState.getMetadataWithKey("not-a-store",
                 "key",
                 Serdes.String().serializer());
         assertNull(actual);
@@ -264,28 +264,28 @@ public void 
shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowWhenKeyIsNull() {
-        discovery.getMetadataWithKey("table-three", null, 
Serdes.String().serializer());
+        metadataState.getMetadataWithKey("table-three", null, 
Serdes.String().serializer());
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowWhenSerializerIsNull() {
-        discovery.getMetadataWithKey("table-three", "key", (Serializer) null);
+        metadataState.getMetadataWithKey("table-three", "key", (Serializer) 
null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfStoreNameIsNull() {
-        discovery.getMetadataWithKey(null, "key", 
Serdes.String().serializer());
+        metadataState.getMetadataWithKey(null, "key", 
Serdes.String().serializer());
     }
 
     @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfStreamPartitionerIsNull() {
-        discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null);
+        metadataState.getMetadataWithKey(null, "key", (StreamPartitioner) 
null);
     }
 
     @Test
     public void shouldHaveGlobalStoreInAllMetadata() {
-        final Collection<StreamsMetadata> metadata = 
discovery.getAllMetadataForStore(globalTable);
+        final Collection<StreamsMetadata> metadata = 
metadataState.getAllMetadataForStore(globalTable);
         assertEquals(3, metadata.size());
         for (final StreamsMetadata streamsMetadata : metadata) {
             
assertTrue(streamsMetadata.stateStoreNames().contains(globalTable));
@@ -294,7 +294,7 @@ public void shouldHaveGlobalStoreInAllMetadata() {
 
     @Test
     public void shouldGetMyMetadataForGlobalStoreWithKey() {
-        final StreamsMetadata metadata = 
discovery.getMetadataWithKey(globalTable, "key", Serdes.String().serializer());
+        final StreamsMetadata metadata = 
metadataState.getMetadataWithKey(globalTable, "key", 
Serdes.String().serializer());
         assertEquals(hostOne, metadata.hostInfo());
     }
 
@@ -307,7 +307,7 @@ public void 
shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() {
 
     @Test
     public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() {
-        final StreamsMetadata metadata = 
discovery.getMetadataWithKey(globalTable, "key", partitioner);
+        final StreamsMetadata metadata = 
metadataState.getMetadataWithKey(globalTable, "key", partitioner);
         assertEquals(hostOne, metadata.hostInfo());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 19bd523a8d5..1ac6d94cfee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -71,7 +71,7 @@ public void setUp() {
             Serdes.String(),
             Serdes.Long(),
             streamsMetrics,
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             new RecordCollector.Supplier() {
                 @Override
                 public RecordCollector recordCollector() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 113e3e1151e..5ae32eb4eff 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -63,7 +63,7 @@
 
     @Before
     public void setUp() {
-        final Properties props = StreamsTestUtils.minimalStreamsConfig();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
MockRocksDbConfigSetter.class);
         rocksDBStore = new RocksDBStore("test");
         dir = TestUtils.tempDirectory();
@@ -131,7 +131,7 @@ public void shouldCallRocksDbConfigSetter() {
     @Test
     public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() {
         final File tmpDir = TestUtils.tempDirectory();
-        final InternalMockProcessorContext tmpContext = new 
InternalMockProcessorContext(tmpDir, new 
StreamsConfig(StreamsTestUtils.minimalStreamsConfig()));
+        final InternalMockProcessorContext tmpContext = new 
InternalMockProcessorContext(tmpDir, new 
StreamsConfig(StreamsTestUtils.getStreamsConfig()));
 
         assertTrue(tmpDir.setReadOnly());
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 75bb21943ad..e0130098b38 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -96,8 +96,7 @@ public void before() {
         configureRestoreConsumer(clientSupplier, 
"applicationId-kv-store-changelog");
         configureRestoreConsumer(clientSupplier, 
"applicationId-window-store-changelog");
 
-        topology.setApplicationId(applicationId);
-        final ProcessorTopology processorTopology = 
topology.getInternalBuilder().build();
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder(applicationId).build();
 
         tasks = new HashMap<>();
         stateDirectory = new StateDirectory(streamsConfig, new MockTime());
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ec8d3280a4d..6d4f5e25419 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -68,7 +68,7 @@ public InternalMockProcessorContext() {
             null,
             null,
             new StreamsMetricsImpl(new Metrics(), "mock"),
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             null,
             null
         );
@@ -99,12 +99,8 @@ public InternalMockProcessorContext(final StateSerdes<?, ?> 
serdes,
             serdes.keySerde(),
             serdes.valueSerde(),
             new StreamsMetricsImpl(metrics, "mock"),
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new 
RecordCollector.Supplier() {
-                @Override
-                public RecordCollector recordCollector() {
-                    return collector;
-                }
-            },
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            () -> collector,
             null
         );
     }
@@ -118,7 +114,7 @@ public InternalMockProcessorContext(final File stateDir,
             keySerde,
             valSerde,
             new StreamsMetricsImpl(new Metrics(), "mock"),
-            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             cache
         );
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java 
b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 940651915f2..1d64316cb3c 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -44,48 +44,37 @@ public static Properties getStreamsConfig(final String 
applicationId,
                                               final String valueSerdeClassName,
                                               final Properties additional) {
 
-        final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-        streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"1000");
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
keySerdeClassName);
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
valueSerdeClassName);
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        streamsConfiguration.putAll(additional);
-        return streamsConfiguration;
-
-    }
-
-    public static Properties topologyTestConfig(final String applicationId,
-                                                final String bootstrapServers,
-                                                final String keyDeserializer,
-                                                final String 
valueDeserializer) {
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
keyDeserializer);
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
valueDeserializer);
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
keySerdeClassName);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
valueSerdeClassName);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        props.putAll(additional);
         return props;
     }
 
-    public static Properties topologyTestConfig(final Serde keyDeserializer,
-                                                final Serde valueDeserializer) 
{
-        return topologyTestConfig(
+    public static Properties getStreamsConfig(final Serde keyDeserializer,
+                                              final Serde valueDeserializer) {
+        return getStreamsConfig(
                 UUID.randomUUID().toString(),
                 "localhost:9091",
                 keyDeserializer.getClass().getName(),
-                valueDeserializer.getClass().getName());
+                valueDeserializer.getClass().getName(),
+                new Properties());
+    }
+
+    public static Properties getStreamsConfig(final String applicationId) {
+        return getStreamsConfig(
+            applicationId,
+            "localhost:9091",
+            Serdes.ByteArraySerde.class.getName(),
+            Serdes.ByteArraySerde.class.getName(),
+            new Properties());
     }
 
-    public static Properties minimalStreamsConfig() {
-        final Properties properties = new Properties();
-        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, 
UUID.randomUUID().toString());
-        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"anyserver:9092");
-        return properties;
+    public static Properties getStreamsConfig() {
+        return getStreamsConfig(UUID.randomUUID().toString());
     }
 
     public static <K, V> List<KeyValue<K, V>> toList(final 
Iterator<KeyValue<K, V>> iterator) {
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index d2796db6f9c..5f3f96145a3 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -239,7 +239,7 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
         mockWallClockTime = new MockTime(initialWallClockTimeMs);
 
         internalTopologyBuilder = builder;
-        
internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
+        internalTopologyBuilder.rewriteTopology(streamsConfig);
 
         processorTopology = internalTopologyBuilder.build(null);
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove caching wrapper stores if cache-size is configured to zero bytes
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-6998
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6998
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Major
>              Labels: kip
>
> Users can disable caching globally by setting the cache size to zero in their 
> config. However, this does only effectively disable the caching layer, but 
> the code is still in place.
> We should consider to remove the caching wrappers completely for this case. 
> The tricky part is, that we insert the caching layer at compile time, ie, 
> when calling `StreamsBuilder#build()` – at this point, we don't know the 
> configuration yet. Thus, we need to find a way to rewrite the topology after 
> it is passed to `KafkaStreams` if case caching size is set to zero.
> KIP: [KIP-356: Add withCachingDisabled() to 
> StoreBuilder|https://cwiki.apache.org/confluence/display/KAFKA/KIP-356%3A+Add+withCachingDisabled%28%29+to+StoreBuilder?src=jira]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to