This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5243fb9a7d7 KAFKA-18026: migrate KTableSource to use 
ProcesserSupplier#stores (#17903)
5243fb9a7d7 is described below

commit 5243fb9a7d717521e64fa57bcdf895825df37e85
Author: Almog Gavra <[email protected]>
AuthorDate: Wed Nov 27 14:04:27 2024 -0800

    KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores (#17903)
    
    This PR is part of the implementation for KIP-1112 (KAFKA-18026). In order 
to have DSL operators be properly wrapped by the interface suggestion in 1112, 
we need to make sure they all use the ConnectedStoreProvider#stores method to 
connect stores instead of manually calling addStateStore.
    
    This is a refactor only, there is no new behaviors.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |   8 +-
 .../java/org/apache/kafka/streams/Topology.java    |   8 +-
 .../kstream/internals/InternalStreamsBuilder.java  |   9 +-
 .../streams/kstream/internals/KStreamImpl.java     |   7 +-
 .../streams/kstream/internals/KStreamImplJoin.java |   2 +-
 .../streams/kstream/internals/KTableImpl.java      |   7 +-
 .../streams/kstream/internals/KTableSource.java    |  24 ++++-
 .../kstream/internals/MaterializedInternal.java    |  11 ++-
 .../kstream/internals/graph/GlobalStoreNode.java   |  12 ++-
 .../kstream/internals/graph/TableSourceNode.java   |  34 ++-----
 .../internals/InternalTopologyBuilder.java         |  41 +++++----
 .../processor/internals/StoreBuilderWrapper.java   |  10 +-
 .../StoreDelegatingProcessorSupplier.java          |  47 ++++++++++
 .../streams/processor/internals/StoreFactory.java  |  76 ++++++++++++++++
 .../apache/kafka/streams/StreamsBuilderTest.java   |  79 +++++++++++-----
 .../apache/kafka/streams/StreamsConfigTest.java    |   6 +-
 .../org/apache/kafka/streams/TopologyTest.java     |  13 +--
 .../internals/graph/TableSourceNodeTest.java       |  17 +++-
 .../internals/GlobalStreamThreadTest.java          |   7 +-
 .../internals/InternalTopologyBuilderTest.java     | 101 +++++++++++----------
 .../org/apache/kafka/streams/utils/TestUtils.java  |  16 ++--
 .../kafka/test/MockKeyValueStoreBuilder.java       |   2 +-
 22 files changed, 361 insertions(+), 176 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 2879436e500..b9cc75b9fde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -384,7 +384,9 @@ public class StreamsBuilder {
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
             new MaterializedInternal<>(
                 Materialized.with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()),
-                internalStreamsBuilder, topic + "-");
+                internalStreamsBuilder,
+                topic + "-",
+                true /* force materializing global tables */);
 
         return internalStreamsBuilder.globalTable(topic, consumedInternal, 
materializedInternal);
     }
@@ -517,7 +519,7 @@ public class StreamsBuilder {
      */
     public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> 
builder) {
         Objects.requireNonNull(builder, "builder can't be null");
-        internalStreamsBuilder.addStateStore(new StoreBuilderWrapper(builder));
+        
internalStreamsBuilder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(builder));
         return this;
     }
 
@@ -556,7 +558,7 @@ public class StreamsBuilder {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         internalStreamsBuilder.addGlobalStore(
-            new StoreBuilderWrapper(storeBuilder),
+            StoreBuilderWrapper.wrapStoreBuilder(storeBuilder),
             topic,
             new ConsumedInternal<>(consumed),
             stateUpdateSupplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 8fd34a47327..35fe13faa38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -34,7 +34,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
+import 
org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Set;
@@ -853,14 +853,13 @@ public class Topology {
                                                            final String 
processorName,
                                                            final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
         internalTopologyBuilder.addGlobalStore(
-            new StoreBuilderWrapper(storeBuilder),
             sourceName,
             null,
             keyDeserializer,
             valueDeserializer,
             topic,
             processorName,
-            stateUpdateSupplier,
+            new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, 
Set.of(storeBuilder)),
             true
         );
         return this;
@@ -899,14 +898,13 @@ public class Topology {
                                                            final String 
processorName,
                                                            final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
         internalTopologyBuilder.addGlobalStore(
-            new StoreBuilderWrapper(storeBuilder),
             sourceName,
             timestampExtractor,
             keyDeserializer,
             valueDeserializer,
             topic,
             processorName,
-            stateUpdateSupplier,
+            new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, 
Set.of(storeBuilder)),
             true
         );
         return this;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 92dde06e9c0..1e148ac047c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -140,7 +140,7 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         final String tableSourceName = named
             .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);
 
-        final KTableSource<K, V> tableSource = new 
KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
+        final KTableSource<K, V> tableSource = new 
KTableSource<>(materialized);
         final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(tableSource, tableSourceName);
 
         final TableSourceNode<K, V> tableSourceNode = TableSourceNode.<K, 
V>tableSourceNodeBuilder()
@@ -148,7 +148,6 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
             .withSourceName(sourceName)
             .withNodeName(tableSourceName)
             .withConsumedInternal(consumed)
-            .withMaterializedInternal(materialized)
             .withProcessorParameters(processorParameters)
             .build();
         tableSourceNode.setOutputVersioned(materialized.storeSupplier() 
instanceof VersionedBytesStoreSupplier);
@@ -186,9 +185,7 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         final String processorName = named
                 .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);
 
-        // enforce store name as queryable name to always materialize global 
table stores
-        final String storeName = materialized.storeName();
-        final KTableSource<K, V> tableSource = new KTableSource<>(storeName, 
storeName);
+        final KTableSource<K, V> tableSource = new 
KTableSource<>(materialized);
 
         final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(tableSource, processorName);
 
@@ -197,12 +194,12 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
             .isGlobalKTable(true)
             .withSourceName(sourceName)
             .withConsumedInternal(consumed)
-            .withMaterializedInternal(materialized)
             .withProcessorParameters(processorParameters)
             .build();
 
         addGraphNode(root, tableSourceNode);
 
+        final String storeName = materialized.storeName();
         return new GlobalKTableImpl<>(new 
KTableSourceValueGetterSupplier<>(storeName), 
materialized.queryableStoreName());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ec2fd211efb..b650724055b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -658,10 +658,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             subTopologySourceNodes = this.subTopologySourceNodes;
         }
 
-        final KTableSource<K, V> tableSource = new KTableSource<>(
-            materializedInternal.storeName(),
-            materializedInternal.queryableStoreName()
-        );
+        final KTableSource<K, V> tableSource = new 
KTableSource<>(materializedInternal);
         final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(tableSource, name);
         final GraphNode tableNode = new StreamToTableNode<>(
             name,
@@ -1171,7 +1168,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             bufferStoreName = Optional.of(name + "-Buffer");
             final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> 
storeBuilder =
                     new 
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), 
joinedInternal.gracePeriod(), name);
-            builder.addStateStore(new StoreBuilderWrapper(storeBuilder));
+            
builder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder));
         }
 
         final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new 
KStreamKTableJoin<>(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 394c1300588..12bb6c19db8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -262,7 +262,7 @@ class KStreamImplJoin {
     private static <K, V> StoreFactory 
joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier,
                                                                           
final Serde<K> keySerde,
                                                                           
final Serde<V> valueSerde) {
-        return new StoreBuilderWrapper(Stores.windowStoreBuilder(
+        return StoreBuilderWrapper.wrapStoreBuilder(Stores.windowStoreBuilder(
             storeSupplier,
             keySerde,
             valueSerde
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 09efdb78006..2c75167f019 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -590,7 +590,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
             name,
             new ProcessorParameters<>(suppressionSupplier, name),
-            new StoreBuilderWrapper(storeBuilder)
+            StoreBuilderWrapper.wrapStoreBuilder(storeBuilder)
         );
         node.setOutputVersioned(false);
 
@@ -1227,10 +1227,7 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
             materializedInternal.withKeySerde(keySerde);
         }
 
-        final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(
-            materializedInternal.storeName(),
-            materializedInternal.queryableStoreName()
-        );
+        final KTableSource<K, VR> resultProcessorSupplier = new 
KTableSource<>(materializedInternal);
 
         final StoreFactory resultStore =
             new KeyValueStoreMaterializer<>(materializedInternal);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index b29a4fa51f1..e41f2bf06dd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -17,12 +17,16 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
@@ -30,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
+import java.util.Set;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static 
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
@@ -40,15 +45,17 @@ public class KTableSource<KIn, VIn> implements 
ProcessorSupplier<KIn, VIn, KIn,
     private static final Logger LOG = 
LoggerFactory.getLogger(KTableSource.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private String queryableName;
     private boolean sendOldValues;
 
-    public KTableSource(final String storeName, final String queryableName) {
+    public KTableSource(
+            final MaterializedInternal<KIn, VIn, KeyValueStore<Bytes, byte[]>> 
materialized) {
+        this.storeName = materialized.storeName();
         Objects.requireNonNull(storeName, "storeName can't be null");
-
-        this.storeName = storeName;
-        this.queryableName = queryableName;
+        this.queryableName = materialized.queryableStoreName();
         this.sendOldValues = false;
+        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
     }
 
     public String queryableName() {
@@ -60,6 +67,15 @@ public class KTableSource<KIn, VIn> implements 
ProcessorSupplier<KIn, VIn, KIn,
         return new KTableSourceProcessor();
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        if (materialized()) {
+            return Set.of(new 
StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
+        } else {
+            return null;
+        }
+    }
+
     // when source ktable requires sending old values, we just
     // need to set the queryable name as the store name to enforce 
materialization
     public void enableSendingOldValues() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index cf6ce76f8d5..d6cd130ba6d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -39,12 +39,19 @@ public final class MaterializedInternal<K, V, S extends 
StateStore> extends Mate
     public MaterializedInternal(final Materialized<K, V, S> materialized,
                                 final InternalNameProvider nameProvider,
                                 final String generatedStorePrefix) {
+        this(materialized, nameProvider, generatedStorePrefix, false);
+    }
+
+    public MaterializedInternal(final Materialized<K, V, S> materialized,
+                                final InternalNameProvider nameProvider,
+                                final String generatedStorePrefix,
+                                final boolean forceQueryable) {
         super(materialized);
 
         // if storeName is not provided, the corresponding KTable would never 
be queryable;
         // but we still need to provide an internal name for it in case we 
materialize.
-        queryable = storeName() != null;
-        if (!queryable && nameProvider != null) {
+        queryable = forceQueryable || storeName() != null;
+        if (storeName() == null && nameProvider != null) {
             storeName = nameProvider.newStoreName(generatedStorePrefix);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
index df6e7c263e6..a9093ad4770 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
@@ -20,8 +20,11 @@ import 
org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import 
org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 
+import java.util.Set;
+
 public class GlobalStoreNode<KIn, VIn, S extends StateStore> extends 
StateStoreNode<S> {
 
     private final String sourceName;
@@ -52,15 +55,16 @@ public class GlobalStoreNode<KIn, VIn, S extends 
StateStore> extends StateStoreN
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
         storeBuilder.withLoggingDisabled();
-        topologyBuilder.addGlobalStore(storeBuilder,
-                                       sourceName,
+        topologyBuilder.addGlobalStore(sourceName,
                                        consumed.timestampExtractor(),
                                        consumed.keyDeserializer(),
                                        consumed.valueDeserializer(),
                                        topic,
                                        processorName,
-                                       stateUpdateSupplier,
-                                       reprocessOnRestore);
+                                       new StoreDelegatingProcessorSupplier<>(
+                                               stateUpdateSupplier,
+                                               Set.of(new 
StoreFactory.FactoryWrappingStoreBuilder<>(storeBuilder))
+                                       ), reprocessOnRestore);
 
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index f0f8e0dcb4a..5e776a5c733 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -17,15 +17,10 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
-import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collections;
 import java.util.Iterator;
@@ -36,7 +31,6 @@ import java.util.Iterator;
  */
 public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {
 
-    private final MaterializedInternal<K, V, ?> materializedInternal;
     private final ProcessorParameters<K, V, ?, ?> processorParameters;
     private final String sourceName;
     private final boolean isGlobalKTable;
@@ -46,7 +40,6 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, 
V> {
                             final String sourceName,
                             final String topic,
                             final ConsumedInternal<K, V> consumedInternal,
-                            final MaterializedInternal<K, V, ?> 
materializedInternal,
                             final ProcessorParameters<K, V, ?, ?> 
processorParameters,
                             final boolean isGlobalKTable) {
 
@@ -57,7 +50,6 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, 
V> {
         this.sourceName = sourceName;
         this.isGlobalKTable = isGlobalKTable;
         this.processorParameters = processorParameters;
-        this.materializedInternal = materializedInternal;
     }
 
 
@@ -68,7 +60,6 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, 
V> {
     @Override
     public String toString() {
         return "TableSourceNode{" +
-               "materializedInternal=" + materializedInternal +
                ", processorParameters=" + processorParameters +
                ", sourceName='" + sourceName + '\'' +
                ", isGlobalKTable=" + isGlobalKTable +
@@ -93,12 +84,8 @@ public class TableSourceNode<K, V> extends 
SourceGraphNode<K, V> {
             throw new IllegalStateException("A table source node must have a 
single topic as input");
         }
 
-        final StoreFactory storeFactory =
-            new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, 
KeyValueStore<Bytes, byte[]>>) materializedInternal);
-
         if (isGlobalKTable) {
             topologyBuilder.addGlobalStore(
-                storeFactory,
                 sourceName,
                 consumedInternal().timestampExtractor(),
                 consumedInternal().keyDeserializer(),
@@ -116,16 +103,16 @@ public class TableSourceNode<K, V> extends 
SourceGraphNode<K, V> {
                                       consumedInternal().valueDeserializer(),
                                       topicName);
 
-            topologyBuilder.addProcessor(processorParameters.processorName(), 
processorParameters.processorSupplier(), sourceName);
+            processorParameters.addProcessorTo(topologyBuilder, new String[] 
{sourceName});
 
-            // only add state store if the source KTable should be materialized
+            // if the KTableSource should not be materialized, stores will be 
null or empty
             final KTableSource<K, V> tableSource = (KTableSource<K, V>) 
processorParameters.processorSupplier();
-            if (tableSource.materialized()) {
-                topologyBuilder.addStateStore(storeFactory, nodeName());
-
+            if (tableSource.stores() != null) {
                 if (shouldReuseSourceTopicForChangelog) {
-                    storeFactory.withLoggingDisabled();
-                    
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName);
+                    tableSource.stores().forEach(store -> {
+                        store.withLoggingDisabled();
+                        
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
+                    });
                 }
             }
         }
@@ -138,7 +125,6 @@ public class TableSourceNode<K, V> extends 
SourceGraphNode<K, V> {
         private String sourceName;
         private String topic;
         private ConsumedInternal<K, V> consumedInternal;
-        private MaterializedInternal<K, V, ?> materializedInternal;
         private ProcessorParameters<K, V, ?, ?> processorParameters;
         private boolean isGlobalKTable = false;
 
@@ -155,11 +141,6 @@ public class TableSourceNode<K, V> extends 
SourceGraphNode<K, V> {
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V> withMaterializedInternal(final 
MaterializedInternal<K, V, ?> materializedInternal) {
-            this.materializedInternal = materializedInternal;
-            return this;
-        }
-
         public TableSourceNodeBuilder<K, V> withConsumedInternal(final 
ConsumedInternal<K, V> consumedInternal) {
             this.consumedInternal = consumedInternal;
             return this;
@@ -185,7 +166,6 @@ public class TableSourceNode<K, V> extends 
SourceGraphNode<K, V> {
                                          sourceName,
                                          topic,
                                          consumedInternal,
-                                         materializedInternal,
                                          processorParameters,
                                          isGlobalKTable);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 2c5e798b62d..9f65a415d95 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyConfig;
+import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.processor.StateStore;
@@ -607,7 +608,7 @@ public class InternalTopologyBuilder {
 
     public final void addStateStore(final StoreBuilder<?> storeBuilder,
                                     final String... processorNames) {
-        addStateStore(new StoreBuilderWrapper(storeBuilder), false, 
processorNames);
+        addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), 
false, processorNames);
     }
 
     public final void addStateStore(final StoreFactory storeFactory,
@@ -638,8 +639,7 @@ public class InternalTopologyBuilder {
         nodeGroups = null;
     }
 
-    public final <KIn, VIn> void addGlobalStore(final StoreFactory 
storeFactory,
-                                                final String sourceName,
+    public final <KIn, VIn> void addGlobalStore(final String sourceName,
                                                 final TimestampExtractor 
timestampExtractor,
                                                 final Deserializer<KIn> 
keyDeserializer,
                                                 final Deserializer<VIn> 
valueDeserializer,
@@ -647,8 +647,15 @@ public class InternalTopologyBuilder {
                                                 final String processorName,
                                                 final ProcessorSupplier<KIn, 
VIn, Void, Void> stateUpdateSupplier,
                                                 final boolean 
reprocessOnRestore) {
-        Objects.requireNonNull(storeFactory, "store builder must not be null");
         ApiUtils.checkSupplier(stateUpdateSupplier);
+        final Set<StoreBuilder<?>> stores = stateUpdateSupplier.stores();
+        if (stores == null || stores.size() != 1) {
+            throw new IllegalArgumentException(
+                    "Global stores must pass in suppliers with exactly one 
store but got " +
+                            (stores != null ? stores.size() : 0));
+        }
+        final StoreFactory storeFactory =
+                StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next());
         validateGlobalStoreArguments(sourceName,
                                      topic,
                                      processorName,
@@ -2105,8 +2112,8 @@ public class InternalTopologyBuilder {
     private static final SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new 
SubtopologyComparator();
 
     public static final class TopologyDescription implements 
org.apache.kafka.streams.TopologyDescription {
-        private final TreeSet<TopologyDescription.Subtopology> subtopologies = 
new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
-        private final TreeSet<TopologyDescription.GlobalStore> globalStores = 
new TreeSet<>(GLOBALSTORE_COMPARATOR);
+        private final TreeSet<Subtopology> subtopologies = new 
TreeSet<>(SUBTOPOLOGY_COMPARATOR);
+        private final TreeSet<GlobalStore> globalStores = new 
TreeSet<>(GLOBALSTORE_COMPARATOR);
         private final String namedTopology;
 
         public TopologyDescription() {
@@ -2117,21 +2124,21 @@ public class InternalTopologyBuilder {
             this.namedTopology = namedTopology;
         }
 
-        public void addSubtopology(final TopologyDescription.Subtopology 
subtopology) {
+        public void addSubtopology(final Subtopology subtopology) {
             subtopologies.add(subtopology);
         }
 
-        public void addGlobalStore(final TopologyDescription.GlobalStore 
globalStore) {
+        public void addGlobalStore(final GlobalStore globalStore) {
             globalStores.add(globalStore);
         }
 
         @Override
-        public Set<TopologyDescription.Subtopology> subtopologies() {
+        public Set<Subtopology> subtopologies() {
             return Collections.unmodifiableSet(subtopologies);
         }
 
         @Override
-        public Set<TopologyDescription.GlobalStore> globalStores() {
+        public Set<GlobalStore> globalStores() {
             return Collections.unmodifiableSet(globalStores);
         }
 
@@ -2144,17 +2151,17 @@ public class InternalTopologyBuilder {
             } else {
                 sb.append("Topology: ").append(namedTopology).append(":\n ");
             }
-            final TopologyDescription.Subtopology[] sortedSubtopologies =
-                subtopologies.descendingSet().toArray(new 
TopologyDescription.Subtopology[0]);
-            final TopologyDescription.GlobalStore[] sortedGlobalStores =
+            final Subtopology[] sortedSubtopologies =
+                subtopologies.descendingSet().toArray(new Subtopology[0]);
+            final GlobalStore[] sortedGlobalStores =
                 globalStores.descendingSet().toArray(new GlobalStore[0]);
             int expectedId = 0;
             int subtopologiesIndex = sortedSubtopologies.length - 1;
             int globalStoresIndex = sortedGlobalStores.length - 1;
             while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
                 sb.append("  ");
-                final TopologyDescription.Subtopology subtopology = 
sortedSubtopologies[subtopologiesIndex];
-                final TopologyDescription.GlobalStore globalStore = 
sortedGlobalStores[globalStoresIndex];
+                final Subtopology subtopology = 
sortedSubtopologies[subtopologiesIndex];
+                final GlobalStore globalStore = 
sortedGlobalStores[globalStoresIndex];
                 if (subtopology.id() == expectedId) {
                     sb.append(subtopology);
                     subtopologiesIndex--;
@@ -2165,13 +2172,13 @@ public class InternalTopologyBuilder {
                 expectedId++;
             }
             while (subtopologiesIndex != -1) {
-                final TopologyDescription.Subtopology subtopology = 
sortedSubtopologies[subtopologiesIndex];
+                final Subtopology subtopology = 
sortedSubtopologies[subtopologiesIndex];
                 sb.append("  ");
                 sb.append(subtopology);
                 subtopologiesIndex--;
             }
             while (globalStoresIndex != -1) {
-                final TopologyDescription.GlobalStore globalStore = 
sortedGlobalStores[globalStoresIndex];
+                final GlobalStore globalStore = 
sortedGlobalStores[globalStoresIndex];
                 sb.append("  ");
                 sb.append(globalStore);
                 globalStoresIndex--;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
index b8522b8e2cd..4648533af1d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
@@ -38,7 +38,15 @@ public class StoreBuilderWrapper implements StoreFactory {
     private final StoreBuilder<?> builder;
     private final Set<String> connectedProcessorNames = new HashSet<>();
 
-    public StoreBuilderWrapper(final StoreBuilder<?> builder) {
+    public static StoreFactory wrapStoreBuilder(final StoreBuilder<?> builder) 
{
+        if (builder instanceof FactoryWrappingStoreBuilder) {
+            return ((FactoryWrappingStoreBuilder<?>) builder).storeFactory();
+        } else {
+            return new StoreBuilderWrapper(builder);
+        }
+    }
+
+    private StoreBuilderWrapper(final StoreBuilder<?> builder) {
         this.builder = builder;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java
new file mode 100644
index 00000000000..cce8281e15e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Set;
+
+public class StoreDelegatingProcessorSupplier<KIn, VIn, KOut, VOut> implements 
ProcessorSupplier<KIn, VIn, KOut, VOut> {
+
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate;
+    private final Set<StoreBuilder<?>> stores;
+
+    public StoreDelegatingProcessorSupplier(
+            final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate,
+            final Set<StoreBuilder<?>> stores
+    ) {
+        this.delegate = delegate;
+        this.stores = stores;
+    }
+
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return stores;
+    }
+
+    @Override
+    public Processor<KIn, VIn, KOut, VOut> get() {
+        return delegate.get();
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
index b05c334c27f..7542f4c5bd8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Map;
 import java.util.Set;
@@ -75,4 +76,79 @@ public interface StoreFactory {
 
     boolean isCompatibleWith(StoreFactory storeFactory);
 
+    class FactoryWrappingStoreBuilder<T extends StateStore> implements 
StoreBuilder<T> {
+
+        private final StoreFactory storeFactory;
+
+        public FactoryWrappingStoreBuilder(final StoreFactory storeFactory) {
+            this.storeFactory = storeFactory;
+        }
+
+        public StoreFactory storeFactory() {
+            return storeFactory;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final FactoryWrappingStoreBuilder<?> that = 
(FactoryWrappingStoreBuilder<?>) o;
+
+            return storeFactory.isCompatibleWith(that.storeFactory);
+        }
+
+        @Override
+        public int hashCode() {
+            return storeFactory.hashCode();
+        }
+
+        @Override
+        public StoreBuilder<T> withCachingEnabled() {
+            throw new IllegalStateException("Should not try to modify 
StoreBuilder wrapper");
+        }
+
+        @Override
+        public StoreBuilder<T> withCachingDisabled() {
+            storeFactory.withCachingDisabled();
+            return this;
+        }
+
+        @Override
+        public StoreBuilder<T> withLoggingEnabled(final Map<String, String> 
config) {
+            throw new IllegalStateException("Should not try to modify 
StoreBuilder wrapper");
+        }
+
+        @Override
+        public StoreBuilder<T> withLoggingDisabled() {
+            storeFactory.withLoggingDisabled();
+            return this;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T build() {
+            return (T) storeFactory.build();
+        }
+
+        @Override
+        public Map<String, String> logConfig() {
+            return storeFactory.logConfig();
+        }
+
+        @Override
+        public boolean loggingEnabled() {
+            return storeFactory.loggingEnabled();
+        }
+
+        @Override
+        public String name() {
+            return storeFactory.name();
+        }
+    }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index fc601d5b737..5210dd0b3c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -55,7 +55,7 @@ import 
org.apache.kafka.streams.state.internals.InMemoryWindowStore;
 import org.apache.kafka.streams.state.internals.RocksDBStore;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
-import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
+import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockPredicate;
@@ -65,6 +65,7 @@ import org.apache.kafka.test.NoopValueTransformerWithKey;
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.hamcrest.CoreMatchers;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -75,12 +76,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
@@ -1417,10 +1418,10 @@ public class StreamsBuilderTest {
     @Test
     public void shouldWrapProcessorsForProcess() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
-        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
CountingProcessorWrapper.class);
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final AtomicInteger wrappedProcessorCount = new AtomicInteger();
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
+        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
 
         final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
 
@@ -1430,56 +1431,84 @@ public class StreamsBuilderTest {
         final Random random = new Random();
 
         builder.stream("input")
-            .process((ProcessorSupplier<Object, Object, Object, Object>) () -> 
record -> System.out.println("Processing: " + random.nextInt()))
-            .processValues(() -> record -> System.out.println("Processing: " + 
random.nextInt()))
+            .process((ProcessorSupplier<Object, Object, Object, Object>) () -> 
record -> System.out.println("Processing: " + random.nextInt()), 
Named.as("processor1"))
+            .processValues(() -> record -> System.out.println("Processing: " + 
random.nextInt()), Named.as("processor2"))
             .to("output");
 
         builder.build();
-        assertThat(wrappedProcessorCount.get(), CoreMatchers.is(2));
+        assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
+        assertThat(wrappedProcessors, 
Matchers.containsInAnyOrder("processor1", "processor2"));
     }
 
     @Test
     public void shouldWrapProcessorsForAggregationOperators() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
-        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
CountingProcessorWrapper.class);
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final AtomicInteger wrappedProcessorCount = new AtomicInteger();
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
+        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
 
         final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
 
         builder.stream("input")
             .groupByKey()
-            .count() // wrapped 1
-            .toStream()// wrapped 2
+            .count(Named.as("count")) // wrapped 1
+            .toStream(Named.as("toStream"))// wrapped 2
             .to("output");
 
         builder.build();
-        assertThat(wrappedProcessorCount.get(), CoreMatchers.is(2));
+        assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
+        assertThat(wrappedProcessors, Matchers.containsInAnyOrder("count", 
"toStream"));
     }
 
     @Test
     public void shouldWrapProcessorsForStatelessOperators() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
-        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
CountingProcessorWrapper.class);
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final AtomicInteger wrappedProcessorCount = new AtomicInteger();
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
+        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
 
         final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
 
         builder.stream("input")
-            .filter((k, v) -> true) // wrapped 1
-            .map(KeyValue::new) // wrapped 2
-            .selectKey((k, v) -> k) // wrapped 3
-            .peek((k, v) -> { }) // wrapped 4
-            .flatMapValues(e -> new ArrayList<>()) // wrapped 5
-            .toTable() // wrapped 6
-            .toStream() // wrapped 7
+            .filter((k, v) -> true, Named.as("filter")) // wrapped 1
+            .map(KeyValue::new, Named.as("map")) // wrapped 2
+            .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
+            .peek((k, v) -> { }, Named.as("peek")) // wrapped 4
+            .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // 
wrapped 5
+            .toTable(Named.as("toTable")) // wrapped 6 (note named as 
toTable-repartition-filter)
+            .toStream(Named.as("toStream")) // wrapped 7
             .to("output");
 
         builder.build();
-        assertThat(wrappedProcessorCount.get(), CoreMatchers.is(7));
+        assertThat(wrappedProcessors.size(), CoreMatchers.is(7));
+        assertThat(wrappedProcessors, Matchers.containsInAnyOrder(
+                "filter", "map", "selectKey", "peek", "flatMap", 
"toTable-repartition-filter",
+                "toStream"
+        ));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForTableSource() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.table("input") // wrapped 1 (named KTABLE_SOURCE-0000000002)
+                .toStream(Named.as("toStream")) // wrapped 2
+                .to("output");
+
+        builder.build();
+        assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
+        assertThat(wrappedProcessors, Matchers.containsInAnyOrder(
+                "KTABLE-SOURCE-0000000002",
+                "toStream"
+        ));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index b001c98868f..4467e252b92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -42,7 +42,7 @@ import 
org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
 import org.apache.kafka.streams.processor.internals.RecordCollectorTest;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
-import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
+import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
 
 import org.apache.log4j.Level;
 import org.junit.jupiter.api.BeforeEach;
@@ -1230,13 +1230,13 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldAllowConfiguringProcessorWrapperWithClass() {
-        props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, 
CountingProcessorWrapper.class);
+        props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
         new StreamsConfig(props);
     }
 
     @Test
     public void shouldAllowConfiguringProcessorWrapperWithClassName() {
-        props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, 
CountingProcessorWrapper.class.getName());
+        props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class.getName());
         new StreamsConfig(props);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 461aa1a2921..833eada8d5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -45,13 +45,14 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
-import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
+import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -71,7 +72,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import static java.time.Duration.ofMillis;
@@ -2425,10 +2425,10 @@ public class TopologyTest {
     @Test
     public void shouldWrapProcessors() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
-        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
CountingProcessorWrapper.class);
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final AtomicInteger wrappedProcessorCount = new AtomicInteger();
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
+        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
 
         final Topology topology = new Topology(new TopologyConfig(new 
StreamsConfig(props)));
 
@@ -2453,7 +2453,8 @@ public class TopologyTest {
             () -> (Processor<Object, Object, Object, Object>) record -> 
System.out.println("Processing: " + random.nextInt()),
             "p2"
         );
-        assertThat(wrappedProcessorCount.get(), is(3));
+        assertThat(wrappedProcessors.size(), is(3));
+        assertThat(wrappedProcessors, Matchers.containsInAnyOrder("p1", "p2", 
"p3"));
     }
 
     @SuppressWarnings("deprecation")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
index 2988e14e720..bf70d476839 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
@@ -16,23 +16,29 @@
  */
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import 
org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder;
+import org.apache.kafka.streams.processor.api.ProcessorWrapper;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -43,6 +49,12 @@ public class TableSourceNodeTest {
 
     private InternalTopologyBuilder topologyBuilder = 
mock(InternalTopologyBuilder.class);
 
+    @BeforeEach
+    public void before() {
+        when(topologyBuilder.wrapProcessorSupplier(any(), any()))
+                .thenAnswer(iom -> 
ProcessorWrapper.asWrapped(iom.getArgument(1)));
+    }
+
     @Test
     public void 
shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() {
         final boolean shouldReuseSourceTopicForChangelog = true;
@@ -59,12 +71,13 @@ public class TableSourceNodeTest {
 
     private void buildTableSourceNode(final boolean 
shouldReuseSourceTopicForChangelog) {
         final TableSourceNodeBuilder<String, String> tableSourceNodeBuilder = 
TableSourceNode.tableSourceNodeBuilder();
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>>
+                materializedInternal = new 
MaterializedInternal<>(Materialized.as(STORE_NAME));
         final TableSourceNode<String, String> tableSourceNode = 
tableSourceNodeBuilder
             .withTopic(TOPIC)
-            .withMaterializedInternal(new 
MaterializedInternal<>(Materialized.as(STORE_NAME)))
             .withConsumedInternal(new 
ConsumedInternal<>(Consumed.as("node-name")))
             .withProcessorParameters(
-                new ProcessorParameters<>(new KTableSource<>(STORE_NAME, 
STORE_NAME), null))
+                    new ProcessorParameters<>(new 
KTableSource<>(materializedInternal), null))
             .build();
         
tableSourceNode.reuseSourceTopicForChangeLog(shouldReuseSourceTopicForChangelog);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 244a246bd20..e4f78c900d1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -41,6 +41,7 @@ import 
org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
 
@@ -108,15 +109,17 @@ public class GlobalStreamThreadTest {
                 }
             };
 
+        final StoreFactory storeFactory =
+                new 
KeyValueStoreMaterializer<>(materialized).withLoggingDisabled();
+        final StoreBuilder<?> storeBuilder = new 
StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory);
         builder.addGlobalStore(
-            new 
KeyValueStoreMaterializer<>(materialized).withLoggingDisabled(),
             "sourceName",
             null,
             null,
             null,
             GLOBAL_STORE_TOPIC_NAME,
             "processorName",
-            processorSupplier,
+            new StoreDelegatingProcessorSupplier<>(processorSupplier, 
Set.of(storeBuilder)),
             false
         );
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 9d46569c27b..e3add9755ae 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -95,7 +95,8 @@ public class InternalTopologyBuilderTest {
 
     private final Serde<String> stringSerde = Serdes.String();
     private final InternalTopologyBuilder builder = new 
InternalTopologyBuilder();
-    private final StoreFactory storeBuilder = new 
MockKeyValueStoreBuilder("testStore", false).asFactory();
+    private final StoreBuilder<?> storeBuilder = new 
MockKeyValueStoreBuilder("testStore", false);
+    private final StoreFactory storeFactory = new 
MockKeyValueStoreBuilder("testStore", false).asFactory();
 
     @Test
     public void shouldAddSourceWithOffsetReset() {
@@ -225,7 +226,6 @@ public class InternalTopologyBuilderTest {
         final IllegalArgumentException exception = assertThrows(
                 IllegalArgumentException.class,
             () -> builder.addGlobalStore(
-                        new MockKeyValueStoreBuilder("global-store", 
false).asFactory().withLoggingDisabled(),
                         "globalSource",
                         null,
                         null,
@@ -331,18 +331,20 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void testPatternSourceTopicsWithGlobalTopics() {
+        final StoreBuilder<?> storeBuilder =
+                new MockKeyValueStoreBuilder("global-store", false)
+                        .withLoggingDisabled();
         builder.setApplicationId("X");
         builder.addSource(null, "source-1", null, null, null, 
Pattern.compile("topic-1"));
         builder.addSource(null, "source-2", null, null, null, 
Pattern.compile("topic-2"));
         builder.addGlobalStore(
-            new MockKeyValueStoreBuilder("global-store", 
false).asFactory().withLoggingDisabled(),
             "globalSource",
             null,
             null,
             null,
             "globalTopic",
             "global-processor",
-            new MockApiProcessorSupplier<>(),
+            new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(storeBuilder)),
             false
         );
         builder.initializeSubscription();
@@ -356,18 +358,20 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void testNameSourceTopicsWithGlobalTopics() {
+        final StoreBuilder<?> storeBuilder =
+                new MockKeyValueStoreBuilder("global-store", false)
+                        .withLoggingDisabled();
         builder.setApplicationId("X");
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         builder.addSource(null, "source-2", null, null, null, "topic-2");
         builder.addGlobalStore(
-            new MockKeyValueStoreBuilder("global-store", 
false).asFactory().withLoggingDisabled(),
             "globalSource",
             null,
             null,
             null,
             "globalTopic",
             "global-processor",
-            new MockApiProcessorSupplier<>(),
+            new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(storeBuilder)),
             false
         );
         builder.initializeSubscription();
@@ -427,14 +431,14 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void testAddStateStoreWithNonExistingProcessor() {
-        assertThrows(TopologyException.class, () -> 
builder.addStateStore(storeBuilder, "no-such-processor"));
+        assertThrows(TopologyException.class, () -> 
builder.addStateStore(storeFactory, "no-such-processor"));
     }
 
     @Test
     public void testAddStateStoreWithSource() {
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         try {
-            builder.addStateStore(storeBuilder, "source-1");
+            builder.addStateStore(storeFactory, "source-1");
             fail("Should throw TopologyException with store cannot be added to 
source");
         } catch (final TopologyException expected) { /* ok */ }
     }
@@ -444,7 +448,7 @@ public class InternalTopologyBuilderTest {
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         builder.addSink("sink-1", "topic-1", null, null, null, "source-1");
         try {
-            builder.addStateStore(storeBuilder, "sink-1");
+            builder.addStateStore(storeFactory, "sink-1");
             fail("Should throw TopologyException with store cannot be added to 
sink");
         } catch (final TopologyException expected) { /* ok */ }
     }
@@ -454,7 +458,7 @@ public class InternalTopologyBuilderTest {
         final StoreBuilder<KeyValueStore<Object, Object>> otherBuilder =
             new MockKeyValueStoreBuilder("testStore", false);
 
-        builder.addStateStore(storeBuilder);
+        builder.addStateStore(storeFactory);
 
         final TopologyException exception = assertThrows(
             TopologyException.class,
@@ -469,24 +473,23 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() {
-        final StoreFactory globalBuilder =
-            new MockKeyValueStoreBuilder("testStore", 
false).asFactory().withLoggingDisabled();
+        final StoreBuilder<?> globalBuilder =
+            new MockKeyValueStoreBuilder("testStore", 
false).withLoggingDisabled();
 
         builder.addGlobalStore(
-            globalBuilder,
             "global-store",
             null,
             null,
             null,
             "global-topic",
             "global-processor",
-            new MockApiProcessorSupplier<>(),
+            new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(globalBuilder)),
             false
         );
 
         final TopologyException exception = assertThrows(
             TopologyException.class,
-            () -> builder.addStateStore(storeBuilder)
+            () -> builder.addStateStore(storeFactory)
         );
 
         assertThat(
@@ -497,22 +500,21 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() 
{
-        final StoreFactory globalBuilder =
-            new MockKeyValueStoreBuilder("testStore", 
false).asFactory().withLoggingDisabled();
+        final StoreBuilder<?> globalBuilder =
+            new MockKeyValueStoreBuilder("testStore", 
false).withLoggingDisabled();
 
-        builder.addStateStore(storeBuilder);
+        builder.addStateStore(storeFactory);
 
         final TopologyException exception = assertThrows(
             TopologyException.class,
             () -> builder.addGlobalStore(
-                globalBuilder,
                 "global-store",
                 null,
                 null,
                 null,
                 "global-topic",
                 "global-processor",
-                new MockApiProcessorSupplier<>(),
+                new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(globalBuilder)),
                 false
             )
         );
@@ -525,34 +527,32 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void shouldNotAllowToAddGlobalStoresWithSameName() {
-        final StoreFactory firstGlobalBuilder =
-            new MockKeyValueStoreBuilder("testStore", 
false).asFactory().withLoggingDisabled();
-        final StoreFactory secondGlobalBuilder =
-            new MockKeyValueStoreBuilder("testStore", 
false).asFactory().withLoggingDisabled();
+        final StoreBuilder<KeyValueStore<Object, Object>> firstGlobalBuilder =
+            new MockKeyValueStoreBuilder("testStore", 
false).withLoggingDisabled();
+        final StoreBuilder<KeyValueStore<Object, Object>> secondGlobalBuilder =
+            new MockKeyValueStoreBuilder("testStore", 
false).withLoggingDisabled();
 
         builder.addGlobalStore(
-            firstGlobalBuilder,
             "global-store",
             null,
             null,
             null,
             "global-topic",
             "global-processor",
-            new MockApiProcessorSupplier<>(),
+            new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(firstGlobalBuilder)),
             false
         );
 
         final TopologyException exception = assertThrows(
             TopologyException.class,
             () -> builder.addGlobalStore(
-                secondGlobalBuilder,
                 "global-store-2",
                 null,
                 null,
                 null,
                 "global-topic",
                 "global-processor-2",
-                new MockApiProcessorSupplier<>(),
+                new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)),
                 false
             )
         );
@@ -565,35 +565,35 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void testAddStateStore() {
-        builder.addStateStore(storeBuilder);
+        builder.addStateStore(storeFactory);
         builder.setApplicationId("X");
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), 
"source-1");
 
         assertEquals(0, builder.buildTopology().stateStores().size());
 
-        builder.connectProcessorAndStateStores("processor-1", 
storeBuilder.name());
+        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.name());
 
         final List<StateStore> suppliers = 
builder.buildTopology().stateStores();
         assertEquals(1, suppliers.size());
-        assertEquals(storeBuilder.name(), suppliers.get(0).name());
+        assertEquals(storeFactory.name(), suppliers.get(0).name());
     }
 
     @Test
     public void testStateStoreNamesForSubtopology() {
-        builder.addStateStore(storeBuilder);
+        builder.addStateStore(storeFactory);
         builder.setApplicationId("X");
 
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), 
"source-1");
-        builder.connectProcessorAndStateStores("processor-1", 
storeBuilder.name());
+        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.name());
 
         builder.addSource(null, "source-2", null, null, null, "topic-2");
         builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), 
"source-2");
 
         builder.buildTopology();
         final Set<String> stateStoreNames = 
builder.stateStoreNamesForSubtopology(0);
-        assertThat(stateStoreNames, equalTo(Set.of(storeBuilder.name())));
+        assertThat(stateStoreNames, equalTo(Set.of(storeFactory.name())));
 
         final Set<String> emptyStoreNames = 
builder.stateStoreNamesForSubtopology(1);
         assertThat(emptyStoreNames, equalTo(Set.of()));
@@ -607,13 +607,13 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("X");
         builder.addSource(null, "source-1", null, null, null, "topic-1");
 
-        builder.addStateStore(storeBuilder);
+        builder.addStateStore(storeFactory);
         builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), 
"source-1");
-        builder.connectProcessorAndStateStores("processor-1", 
storeBuilder.name());
+        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.name());
 
-        builder.addStateStore(storeBuilder);
+        builder.addStateStore(storeFactory);
         builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), 
"source-1");
-        builder.connectProcessorAndStateStores("processor-2", 
storeBuilder.name());
+        builder.connectProcessorAndStateStores("processor-2", 
storeFactory.name());
 
         assertEquals(1, builder.buildTopology().stateStores().size());
     }
@@ -763,15 +763,16 @@ public class InternalTopologyBuilderTest {
         assertNotEquals(oldNodeGroups, newNodeGroups);
 
         oldNodeGroups = newNodeGroups;
+
+        final StoreBuilder<?> globalBuilder = new 
MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled();
         builder.addGlobalStore(
-            new MockKeyValueStoreBuilder("global-store", 
false).asFactory().withLoggingDisabled(),
             "globalSource",
             null,
             null,
             null,
             "globalTopic",
             "global-processor",
-            new MockApiProcessorSupplier<>(),
+            new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(globalBuilder)),
             false
         );
         newNodeGroups = builder.nodeGroups();
@@ -879,7 +880,7 @@ public class InternalTopologyBuilderTest {
     public void 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source");
-        builder.addStateStore(storeBuilder, "processor");
+        builder.addStateStore(storeFactory, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToFullSourceTopicNames();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("topic"), 
stateStoreNameToSourceTopic.get("testStore"));
@@ -889,7 +890,7 @@ public class InternalTopologyBuilderTest {
     public void 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source");
-        builder.addStateStore(storeBuilder, "processor");
+        builder.addStateStore(storeFactory, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToFullSourceTopicNames();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("topic"), 
stateStoreNameToSourceTopic.get("testStore"));
@@ -901,7 +902,7 @@ public class InternalTopologyBuilderTest {
         builder.addInternalTopic("internal-topic", 
InternalTopicProperties.empty());
         builder.addSource(null, "source", null, null, null, "internal-topic");
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source");
-        builder.addStateStore(storeBuilder, "processor");
+        builder.addStateStore(storeFactory, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToFullSourceTopicNames();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singletonList("appId-internal-topic"), 
stateStoreNameToSourceTopic.get("testStore"));
@@ -975,7 +976,7 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), 
"source");
-        builder.addStateStore(storeBuilder, "processor");
+        builder.addStateStore(storeFactory, "processor");
         builder.buildTopology();
         final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups 
= builder.subtopologyToTopicsInfo();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = 
topicGroups.values().iterator().next();
@@ -1183,7 +1184,7 @@ public class InternalTopologyBuilderTest {
     public void shouldConnectRegexMatchedTopicsToStateStore() {
         builder.addSource(null, "ingest", null, null, null, 
Pattern.compile("topic-\\d+"));
         builder.addProcessor("my-processor", new MockApiProcessorSupplier<>(), 
"ingest");
-        builder.addStateStore(storeBuilder, "my-processor");
+        builder.addStateStore(storeFactory, "my-processor");
 
         final Set<String> updatedTopics = new HashSet<>();
 
@@ -1195,7 +1196,7 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("test-app");
 
         final Map<String, List<String>> stateStoreAndTopics = 
builder.stateStoreNameToFullSourceTopicNames();
-        final List<String> topics = 
stateStoreAndTopics.get(storeBuilder.name());
+        final List<String> topics = 
stateStoreAndTopics.get(storeFactory.name());
 
         assertEquals(2, topics.size(), "Expected to contain two topics");
 
@@ -1208,14 +1209,13 @@ public class InternalTopologyBuilderTest {
     public void 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         final String sameNameForSourceAndProcessor = "sameName";
         assertThrows(TopologyException.class, () -> builder.addGlobalStore(
-            storeBuilder,
             sameNameForSourceAndProcessor,
             null,
             null,
             null,
             "anyTopicName",
             sameNameForSourceAndProcessor,
-            new MockApiProcessorSupplier<>(),
+            new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(storeBuilder)),
             false
         ));
     }
@@ -1351,16 +1351,17 @@ public class InternalTopologyBuilderTest {
     public void shouldConnectGlobalStateStoreToInputTopic() {
         final String globalStoreName = "global-store";
         final String globalTopic = "global-topic";
+        final StoreBuilder<?> storeBuilder =
+                new MockKeyValueStoreBuilder(globalStoreName, 
false).withLoggingDisabled();
         builder.setApplicationId("X");
         builder.addGlobalStore(
-            new MockKeyValueStoreBuilder(globalStoreName, 
false).asFactory().withLoggingDisabled(),
             "globalSource",
             null,
             null,
             null,
             globalTopic,
             "global-processor",
-            new MockApiProcessorSupplier<>(),
+            new StoreDelegatingProcessorSupplier<>(new 
MockApiProcessorSupplier<>(), Set.of(storeBuilder)),
             false
         );
         builder.initializeSubscription();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java 
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 0f0bcf90698..24ac2f2306d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -28,10 +28,12 @@ import org.junit.jupiter.api.TestInfo;
 
 import java.lang.reflect.Method;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
@@ -115,30 +117,30 @@ public class TestUtils {
      * To retrieve the current count, pass an instance of AtomicInteger into 
the configs
      * alongside the wrapper itself. Use the config key defined with {@link 
#PROCESSOR_WRAPPER_COUNTER_CONFIG}
      */
-    public static class CountingProcessorWrapper implements ProcessorWrapper {
+    public static class RecordingProcessorWrapper implements ProcessorWrapper {
 
-        private AtomicInteger wrappedProcessorCount;
+        private Set<String> wrappedProcessorNames;
 
         @Override
         public void configure(final Map<String, ?> configs) {
             if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
-                wrappedProcessorCount = (AtomicInteger) 
configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
+                wrappedProcessorNames = (Set<String>) 
configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
             } else {
-                wrappedProcessorCount = new AtomicInteger();
+                wrappedProcessorNames = Collections.synchronizedSet(new 
HashSet<>());
             }
         }
 
         @Override
         public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, 
VOut> wrapProcessorSupplier(final String processorName,
                                                                                
                            final ProcessorSupplier<KIn, VIn, KOut, VOut> 
processorSupplier) {
-            wrappedProcessorCount.incrementAndGet();
+            wrappedProcessorNames.add(processorName);
             return ProcessorWrapper.asWrapped(processorSupplier);
         }
 
         @Override
         public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, 
VOut> wrapFixedKeyProcessorSupplier(final String processorName,
                                                                                
                                final FixedKeyProcessorSupplier<KIn, VIn, VOut> 
processorSupplier) {
-            wrappedProcessorCount.incrementAndGet();
+            wrappedProcessorNames.add(processorName);
             return ProcessorWrapper.asWrappedFixedKey(processorSupplier);
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java 
b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
index 2faf89b1622..15c896ad076 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
@@ -39,6 +39,6 @@ public class MockKeyValueStoreBuilder extends 
AbstractStoreBuilder<Integer, byte
     }
 
     public StoreFactory asFactory() {
-        return new StoreBuilderWrapper(this);
+        return StoreBuilderWrapper.wrapStoreBuilder(this);
     }
 }

Reply via email to