[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

2020-09-09 Thread GitBox


vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485742765



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
##
@@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
 }
 }
 
+// temporary hack until KIP-478 is fully implemented
+final org.apache.kafka.streams.processor.ProcessorSupplier 
oldProcessorSupplier =
+processorParameters().oldProcessorSupplier();
+if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != 
null) {
+for (final StoreBuilder storeBuilder : 
oldProcessorSupplier.stores()) {
+topologyBuilder.addStateStore(storeBuilder, processorName);
+}
+}
+

Review comment:
   It definitely looks that way, but I've just double-checked, and I think 
it's safe. The thing is that only a subclass of ProcessorSupplier (either the 
new or old one) could override the ConnectedStoreProvider#stores method. Unlike 
Processor and ProcessorContext, we haven't added adapters for ProcessorSupplier 
that could delegate the `stores` method from the new API to the old one, so 
only a proper direct instantiation of the new API ProcessorSupplier could 
return a non-null result from `processorSupplier.stores()` on L96. Likewise, 
`oldProcessorSupplier` is only non-null itself when the provided processor is 
_only_ an old-api processorSupplier.
   
   So, it seems like either L96-99 will add stores or L102-109 will (or 
neither), but never both. Does that reasoning seem legit to you?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

2020-09-09 Thread GitBox


vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485735152



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction 
helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters {
+public class ProcessorParameters {
 
-private final ProcessorSupplier processorSupplier;
+// During the transition to KIP-478, we capture arguments passed from the 
old API to simplify
+// the performance of casts that we still need to perform. This will 
eventually be removed.
+private final org.apache.kafka.streams.processor.ProcessorSupplier oldProcessorSupplier;
+private final ProcessorSupplier processorSupplier;
 private final String processorName;
 
-public ProcessorParameters(final ProcessorSupplier processorSupplier,
+public ProcessorParameters(final 
org.apache.kafka.streams.processor.ProcessorSupplier 
processorSupplier,
final String processorName) {
+oldProcessorSupplier = processorSupplier;
+this.processorSupplier = () -> 
ProcessorAdapter.adapt(processorSupplier.get());
+this.processorName = processorName;
+}
 
+public ProcessorParameters(final ProcessorSupplier 
processorSupplier,
+   final String processorName) {
+oldProcessorSupplier = null;
 this.processorSupplier = processorSupplier;
 this.processorName = processorName;
 }
 
-public ProcessorSupplier processorSupplier() {
+public ProcessorSupplier processorSupplier() {
 return processorSupplier;
 }
 
+public org.apache.kafka.streams.processor.ProcessorSupplier 
oldProcessorSupplier() {
+return oldProcessorSupplier;
+}
+
+@SuppressWarnings("unchecked")
+KTableSource kTableSourceSupplier() {
+// This cast always works because KTableSource hasn't been converted 
yet.
+return oldProcessorSupplier == null
+? null
+: !(oldProcessorSupplier instanceof KTableSource)
+  ? null
+  : (KTableSource) oldProcessorSupplier;
+}

Review comment:
   Thanks; yes, let's revisit it after the dust settles from KIP-478. These 
methods are for the most part temporary, since it's a real pain to do the cast 
when you have to deal with the current "dual interface" state in which 
processors might be old-style or new-style.
   
   I have a feeling I'll be able to eliminate these methods completely when I 
convert the relevant processors to the new API again.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

2020-09-09 Thread GitBox


vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485732127



##
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+ * 
+ * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+ * of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+ *
+ * @param storeBuilder  user defined state store builder
+ * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
+ * @param keyDeserializer   the {@link Deserializer} to deserialize 
keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize 
values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already 
registered
+ */
+public synchronized  Topology addGlobalStore(final 
StoreBuilder storeBuilder,
+   final String 
sourceName,
+   final 
Deserializer keyDeserializer,
+   final 
Deserializer valueDeserializer,
+   final String topic,
+   final String 
processorName,
+   final 
ProcessorSupplier stateUpdateSupplier) {
+internalTopologyBuilder.addGlobalStore(
+storeBuilder,
+sourceName,
+null,
+keyDeserializer,
+valueDeserializer,
+topic,
+processorName,
+stateUpdateSupplier
+);
+return this;
+}
+
+/**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+ * 
+ * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+ * of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+ *
+ * @param storeBuilder  user defined key value store builder
+ * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
+ * @param timestampExtractorthe stateless timestamp extractor used for 
this source,
+ *  if not specified the default extractor 
defined in the configs will be used
+ * @param keyDeserializer   the {@link Deserializer} to deserialize 
keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize 
values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already 
registered
+ */
+public synchronized  Topology addGlobalStore(final 
StoreBuilder storeBuilder,
+   final String 
sourceName,
+   final 
TimestampExtractor timestampExtractor,
+   final 
Deserializer keyDeserializer,
+   final 
Deserializer valueDeserializer,
+   final String topic,
+   

[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes

2020-08-25 Thread GitBox


vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r476953793



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##
@@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
 assertThat(outputTopic.isEmpty(), is(true));
 }
 
-public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier {
+public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier {
 @Override
-public Processor get() {
+public Processor get() {
 return new CustomMaxAggregator();
 }
 }
 
-public static class CustomMaxAggregator implements Processor 
{
-ProcessorContext context;
+public static class CustomMaxAggregator implements Processor {
+ProcessorContext context;
 private KeyValueStore store;
 
 @SuppressWarnings("unchecked")
 @Override
-public void init(final ProcessorContext context) {
+public void init(final ProcessorContext context) {
 this.context = context;
 context.schedule(Duration.ofSeconds(60), 
PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
 context.schedule(Duration.ofSeconds(10), 
PunctuationType.STREAM_TIME, time -> flushStore());
-store = (KeyValueStore) 
context.getStateStore("aggStore");
+store = context.getStateStore("aggStore");

Review comment:
   This is a small improvement I noticed; I'll mention this on the KIP 
discussion if you like it. I've changed the ProcessorContext getStateStore 
method so that we don't have to cast the store type anymore. The generic 
parameters to the method take care of casting now.

##
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##
@@ -658,8 +658,42 @@ public synchronized Topology addSink(final String name,
  */
 @SuppressWarnings("rawtypes")
 public synchronized Topology addProcessor(final String name,
-  final ProcessorSupplier supplier,
+  final 
org.apache.kafka.streams.processor.ProcessorSupplier supplier,
   final String... parentNames) {
+return addProcessor(
+name,
+new ProcessorSupplier() {
+@Override
+public Set> stores() {
+return supplier.stores();
+}
+
+@Override
+public 
org.apache.kafka.streams.processor.api.Processor get() {
+return ProcessorAdapter.adaptRaw(supplier.get());
+}
+},
+parentNames
+);
+}

Review comment:
   
   
   as in previous changes, delegating the old API to the new one.
   

##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##
@@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
 assertThat(outputTopic.isEmpty(), is(true));
 }
 
-public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier {
+public static class CustomMaxAggregatorSupplier implements 
ProcessorSupplier {

Review comment:
   Since the new public API change is small, I also converted almost all of 
the usages of the old API to the new one.

##
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##
@@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+ * 
+ * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+ * of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+ *
+ * @param storeBuilder  user defined state store builder
+ * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
+ * @param keyDeserializer   the {@link Deserializer} to deserialize 
keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize 
values with
+ * @param topic the topic to source the dat