bbejeck commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485691111



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
##########
@@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
             }
         }
 
+        // temporary hack until KIP-478 is fully implemented
+        final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> 
oldProcessorSupplier =
+            processorParameters().oldProcessorSupplier();
+        if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != 
null) {
+            for (final StoreBuilder<?> storeBuilder : 
oldProcessorSupplier.stores()) {
+                topologyBuilder.addStateStore(storeBuilder, processorName);
+            }
+        }
+

Review comment:
       Could this cause a problem with lines 96-99 above? I could be missing 
something, but it looks like we could be attempting to add the same stores 
twice, which _**I think**_  will result in a runtime error building the 
topology.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     *
+     * @param storeBuilder          user defined state store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                           final String 
sourceName,
+                                                           final 
Deserializer<KIn> keyDeserializer,
+                                                           final 
Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String 
processorName,
+                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
+     *                              if not specified the default extractor 
defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                           final String 
sourceName,
+                                                           final 
TimestampExtractor timestampExtractor,
+                                                           final 
Deserializer<KIn> keyDeserializer,
+                                                           final 
Deserializer<VIn> valueDeserializer,
+                                                           final String topic,
+                                                           final String 
processorName,
+                                                           final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       For consideration - maybe provide a builder for use as parameter.  
   Something like 
   ```java
   
GlobalStoreBuilder.builder().addSource().addTimestampextractor().addTopic()...
   ```
   Just a thought, whenever I see long parameter lists I tend to look if 
there's a way to include a builder instead

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction 
helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the 
old API to simplify
+    // the performance of casts that we still need to perform. This will 
eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, 
VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final 
org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> 
ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }
 
+    public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> 
processorSupplier,
+                               final String processorName) {
+        oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
         return processorSupplier;
     }
 
+    public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
oldProcessorSupplier() {
+        return oldProcessorSupplier;
+    }
+
+    @SuppressWarnings("unchecked")
+    KTableSource<KIn, VIn> kTableSourceSupplier() {
+        // This cast always works because KTableSource hasn't been converted 
yet.
+        return oldProcessorSupplier == null
+            ? null
+            : !(oldProcessorSupplier instanceof KTableSource)
+              ? null
+              : (KTableSource<KIn, VIn>) oldProcessorSupplier;
+    }

Review comment:
       Just a minor thought here.  I'm wondering if these `ktableX` methods 
should go in a separate class, I feel like this is "leaks" a little bit. But I 
don't have a better idea ATM, so maybe we can revisit later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to