vvcephei commented on a change in pull request #9221: URL: https://github.com/apache/kafka/pull/9221#discussion_r485742765
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java ########## @@ -99,5 +99,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { } } + // temporary hack until KIP-478 is fully implemented + final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> oldProcessorSupplier = + processorParameters().oldProcessorSupplier(); + if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != null) { + for (final StoreBuilder<?> storeBuilder : oldProcessorSupplier.stores()) { + topologyBuilder.addStateStore(storeBuilder, processorName); + } + } + Review comment: It definitely looks that way, but I've just double-checked, and I think it's safe. The thing is that only a subclass of ProcessorSupplier (either the new or old one) could override the ConnectedStoreProvider#stores method. Unlike Processor and ProcessorContext, we haven't added adapters for ProcessorSupplier that could delegate the `stores` method from the new API to the old one, so only a proper direct instantiation of the new API ProcessorSupplier could return a non-null result from `processorSupplier.stores()` on L96. Likewise, `oldProcessorSupplier` is only non-null itself when the provided processor is _only_ an old-api processorSupplier. So, it seems like either L96-99 will add stores or L102-109 will (or neither), but never both. Does that reasoning seem legit to you? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org