[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes
vvcephei commented on a change in pull request #9221: URL: https://github.com/apache/kafka/pull/9221#discussion_r485742765 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java ## @@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { } } +// temporary hack until KIP-478 is fully implemented +final org.apache.kafka.streams.processor.ProcessorSupplier oldProcessorSupplier = +processorParameters().oldProcessorSupplier(); +if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != null) { +for (final StoreBuilder storeBuilder : oldProcessorSupplier.stores()) { +topologyBuilder.addStateStore(storeBuilder, processorName); +} +} + Review comment: It definitely looks that way, but I've just double-checked, and I think it's safe. The thing is that only a subclass of ProcessorSupplier (either the new or old one) could override the ConnectedStoreProvider#stores method. Unlike Processor and ProcessorContext, we haven't added adapters for ProcessorSupplier that could delegate the `stores` method from the new API to the old one, so only a proper direct instantiation of the new API ProcessorSupplier could return a non-null result from `processorSupplier.stores()` on L96. Likewise, `oldProcessorSupplier` is only non-null itself when the provided processor is _only_ an old-api processorSupplier. So, it seems like either L96-99 will add stores or L102-109 will (or neither), but never both. Does that reasoning seem legit to you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes
vvcephei commented on a change in pull request #9221: URL: https://github.com/apache/kafka/pull/9221#discussion_r485735152 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java ## @@ -26,22 +30,57 @@ * Used by the Join nodes as there are several parameters, this abstraction helps * keep the number of arguments more reasonable. */ -public class ProcessorParameters { +public class ProcessorParameters { -private final ProcessorSupplier processorSupplier; +// During the transition to KIP-478, we capture arguments passed from the old API to simplify +// the performance of casts that we still need to perform. This will eventually be removed. +private final org.apache.kafka.streams.processor.ProcessorSupplier oldProcessorSupplier; +private final ProcessorSupplier processorSupplier; private final String processorName; -public ProcessorParameters(final ProcessorSupplier processorSupplier, +public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier processorSupplier, final String processorName) { +oldProcessorSupplier = processorSupplier; +this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get()); +this.processorName = processorName; +} +public ProcessorParameters(final ProcessorSupplier processorSupplier, + final String processorName) { +oldProcessorSupplier = null; this.processorSupplier = processorSupplier; this.processorName = processorName; } -public ProcessorSupplier processorSupplier() { +public ProcessorSupplier processorSupplier() { return processorSupplier; } +public org.apache.kafka.streams.processor.ProcessorSupplier oldProcessorSupplier() { +return oldProcessorSupplier; +} + +@SuppressWarnings("unchecked") +KTableSource kTableSourceSupplier() { +// This cast always works because KTableSource hasn't been converted yet. +return oldProcessorSupplier == null +? null +: !(oldProcessorSupplier instanceof KTableSource) + ? null + : (KTableSource) oldProcessorSupplier; +} Review comment: Thanks; yes, let's revisit it after the dust settles from KIP-478. These methods are for the most part temporary, since it's a real pain to do the cast when you have to deal with the current "dual interface" state in which processors might be old-style or new-style. I have a feeling I'll be able to eliminate these methods completely when I convert the relevant processors to the new API again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes
vvcephei commented on a change in pull request #9221: URL: https://github.com/apache/kafka/pull/9221#discussion_r485732127 ## File path: streams/src/main/java/org/apache/kafka/streams/Topology.java ## @@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, return this; } +/** + * Adds a global {@link StateStore} to the topology. + * The {@link StateStore} sources its data from all partitions of the provided input topic. + * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. + * + * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions + * of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all + * records forwarded from the {@link SourceNode}. + * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * + * @param storeBuilder user defined state store builder + * @param sourceNamename of the {@link SourceNode} that will be automatically added + * @param keyDeserializer the {@link Deserializer} to deserialize keys with + * @param valueDeserializer the {@link Deserializer} to deserialize values with + * @param topic the topic to source the data from + * @param processorName the name of the {@link ProcessorSupplier} + * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} + * @return itself + * @throws TopologyException if the processor of state is already registered + */ +public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder, + final String sourceName, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier) { +internalTopologyBuilder.addGlobalStore( +storeBuilder, +sourceName, +null, +keyDeserializer, +valueDeserializer, +topic, +processorName, +stateUpdateSupplier +); +return this; +} + +/** + * Adds a global {@link StateStore} to the topology. + * The {@link StateStore} sources its data from all partitions of the provided input topic. + * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. + * + * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions + * of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all + * records forwarded from the {@link SourceNode}. + * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. + * + * @param storeBuilder user defined key value store builder + * @param sourceNamename of the {@link SourceNode} that will be automatically added + * @param timestampExtractorthe stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param keyDeserializer the {@link Deserializer} to deserialize keys with + * @param valueDeserializer the {@link Deserializer} to deserialize values with + * @param topic the topic to source the data from + * @param processorName the name of the {@link ProcessorSupplier} + * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} + * @return itself + * @throws TopologyException if the processor of state is already registered + */ +public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder, + final String sourceName, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, +
[GitHub] [kafka] vvcephei commented on a change in pull request #9221: KAFKA-10436: Implement KIP-478 Topology changes
vvcephei commented on a change in pull request #9221: URL: https://github.com/apache/kafka/pull/9221#discussion_r476953793 ## File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java ## @@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() { assertThat(outputTopic.isEmpty(), is(true)); } -public static class CustomMaxAggregatorSupplier implements ProcessorSupplier { +public static class CustomMaxAggregatorSupplier implements ProcessorSupplier { @Override -public Processor get() { +public Processor get() { return new CustomMaxAggregator(); } } -public static class CustomMaxAggregator implements Processor { -ProcessorContext context; +public static class CustomMaxAggregator implements Processor { +ProcessorContext context; private KeyValueStore store; @SuppressWarnings("unchecked") @Override -public void init(final ProcessorContext context) { +public void init(final ProcessorContext context) { this.context = context; context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore()); context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore()); -store = (KeyValueStore) context.getStateStore("aggStore"); +store = context.getStateStore("aggStore"); Review comment: This is a small improvement I noticed; I'll mention this on the KIP discussion if you like it. I've changed the ProcessorContext getStateStore method so that we don't have to cast the store type anymore. The generic parameters to the method take care of casting now. ## File path: streams/src/main/java/org/apache/kafka/streams/Topology.java ## @@ -658,8 +658,42 @@ public synchronized Topology addSink(final String name, */ @SuppressWarnings("rawtypes") public synchronized Topology addProcessor(final String name, - final ProcessorSupplier supplier, + final org.apache.kafka.streams.processor.ProcessorSupplier supplier, final String... parentNames) { +return addProcessor( +name, +new ProcessorSupplier() { +@Override +public Set> stores() { +return supplier.stores(); +} + +@Override +public org.apache.kafka.streams.processor.api.Processor get() { +return ProcessorAdapter.adaptRaw(supplier.get()); +} +}, +parentNames +); +} Review comment: as in previous changes, delegating the old API to the new one. ## File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java ## @@ -145,24 +145,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() { assertThat(outputTopic.isEmpty(), is(true)); } -public static class CustomMaxAggregatorSupplier implements ProcessorSupplier { +public static class CustomMaxAggregatorSupplier implements ProcessorSupplier { Review comment: Since the new public API change is small, I also converted almost all of the usages of the old API to the new one. ## File path: streams/src/main/java/org/apache/kafka/streams/Topology.java ## @@ -772,6 +806,94 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, return this; } +/** + * Adds a global {@link StateStore} to the topology. + * The {@link StateStore} sources its data from all partitions of the provided input topic. + * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. + * + * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions + * of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all + * records forwarded from the {@link SourceNode}. + * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * + * @param storeBuilder user defined state store builder + * @param sourceNamename of the {@link SourceNode} that will be automatically added + * @param keyDeserializer the {@link Deserializer} to deserialize keys with + * @param valueDeserializer the {@link Deserializer} to deserialize values with + * @param topic the topic to source the dat