ableegoldman commented on code in PR #17903:
URL: https://github.com/apache/kafka/pull/17903#discussion_r1853796622
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java:
##########
@@ -116,16 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder
topologyBuilder) {
consumedInternal().valueDeserializer(),
topicName);
- topologyBuilder.addProcessor(processorParameters.processorName(),
processorParameters.processorSupplier(), sourceName);
+ processorParameters.addProcessorTo(topologyBuilder, new String[]
{sourceName});
- // only add state store if the source KTable should be materialized
+ // if the KTableSource should not be materialized, stores will be
null or empty
final KTableSource<K, V> tableSource = (KTableSource<K, V>)
processorParameters.processorSupplier();
- if (tableSource.materialized()) {
- topologyBuilder.addStateStore(storeFactory, nodeName());
-
+ if (tableSource.stores() != null) {
if (shouldReuseSourceTopicForChangelog) {
- storeFactory.withLoggingDisabled();
-
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName);
+ tableSource.stores().forEach(store -> {
+ store.withLoggingDisabled();
+
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
Review Comment:
Btw I noticed that this is actually called inside #addStateStore, at least
as long as the parent processor names are passed in, which should always be the
case except for with global tables/stores, and when adding stores to a Topology
directly via #addStateStore and connecting them manually using
#connectProcessorToStateStores (ie the alternative to implementing
ProcessorSupplier#stores)
In both those cases #connectSourceStoreAndTopic is called directly, so
AFAICT there's no reason to be invoking
`topologyBuilder.connectSourceStoreAndTopic` all over the place including right
here.
Granted, it's idempotent so calling it again is fine, but it makes the
already-messy topology building code even more confusing. Might be nice to
remove all these extraneous calls (can be done in a separate PR so we can make
sure it doesn't break anything)
##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -384,7 +384,8 @@ public synchronized <K, V> GlobalKTable<K, V>
globalTable(final String topic,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(),
consumedInternal.valueSerde()),
- internalStreamsBuilder, topic + "-");
+ internalStreamsBuilder, topic + "-",
Review Comment:
nit: can you fix this line to put params on new lines?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java:
##########
@@ -75,4 +76,78 @@ default void configure(final StreamsConfig config) {
boolean isCompatibleWith(StoreFactory storeFactory);
+ class FactoryWrappingStoreBuilder<T extends StateStore> implements
StoreBuilder<T> {
+
+ private final StoreFactory storeFactory;
+
+ public FactoryWrappingStoreBuilder(final StoreFactory storeFactory) {
+ this.storeFactory = storeFactory;
+ }
+
+ public StoreFactory storeFactory() {
+ return storeFactory;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final FactoryWrappingStoreBuilder<?> that =
(FactoryWrappingStoreBuilder<?>) o;
+
+ return storeFactory.isCompatibleWith(that.storeFactory);
+ }
+
+ @Override
+ public int hashCode() {
+ return storeFactory.hashCode();
+ }
+
+ @Override
+ public StoreBuilder<T> withCachingEnabled() {
+ throw new IllegalStateException("Should not try to modify
StoreBuilder wrapper");
+ }
+
+ @Override
+ public StoreBuilder<T> withCachingDisabled() {
+ throw new IllegalStateException("Should not try to modify
StoreBuilder wrapper");
Review Comment:
I'm not 100% sure it's necessary, but I think we might as well delegate to
the StoryFactory here as well. If the StoryFactory has/needs this method for
whatever reason then it seems theoretically possible for it to be callled
during the topology building process (whereas imo it's highly unlikely for
withLoggingEnabled or withCachingEnabled to be called on this, especially if
those methods don't even exist on StoryFactory)
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java:
##########
@@ -39,12 +39,19 @@ public MaterializedInternal(final Materialized<K, V, S>
materialized) {
public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix) {
+ this(materialized, nameProvider, generatedStorePrefix, false);
+ }
+
+ public MaterializedInternal(final Materialized<K, V, S> materialized,
+ final InternalNameProvider nameProvider,
+ final String generatedStorePrefix,
+ final boolean forceQueryable) {
Review Comment:
nice refactor, this is way cleaner/easy to follow than randomly overriding
the queryableStoreName for things like global stores
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java:
##########
@@ -40,15 +43,16 @@ public class KTableSource<KIn, VIn> implements
ProcessorSupplier<KIn, VIn, KIn,
private static final Logger LOG =
LoggerFactory.getLogger(KTableSource.class);
private final String storeName;
+ private final StoreFactory storeFactory;
private String queryableName;
private boolean sendOldValues;
- public KTableSource(final String storeName, final String queryableName) {
- Objects.requireNonNull(storeName, "storeName can't be null");
Review Comment:
we should probably keep this check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]