rodesai commented on code in PR #17929:
URL: https://github.com/apache/kafka/pull/17929#discussion_r1857289699
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java:
##########
@@ -75,4 +76,79 @@ default void configure(final StreamsConfig config) {
boolean isCompatibleWith(StoreFactory storeFactory);
+ class FactoryWrappingStoreBuilder<T extends StateStore> implements
StoreBuilder<T> {
Review Comment:
This gave me the spins - previously the factories would wrap builders and
suppliers so we could have a unified configuration interface. But now we're
wrapping a factory in a builder, which is confusing and makes the code harder
to reason about because when I have a builder instance it could actually be a
factory underneath (not sure if that causes any problems, but it was nice that
there was a non-circular hierarchy of wrapping before). Would it be better to
instead revert the dsl operators to create processor instances that return
unwrapped builders, and then wrap them in a factory from `addStateStore`? So,
for example `KeyValueStoreMaterializer` would return a StoreBuilder like it
used to, and then that gets wrapped by `addStateStore`?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java:
##########
@@ -184,7 +184,7 @@ private KTable<K, Long> doCount(final Named named, final
Materialized<K, Long, K
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return doAggregate(
- new KStreamAggregate<>(materializedInternal.storeName(),
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+ new KStreamAggregate<>(materializedInternal,
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
Review Comment:
don't we need to make a change to either pass `null` for the store factory
to `doAggregate`? Otherwise we'll wind up adding the state store to the
topology again from `StatefulProcessorNode`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]