calmera commented on code in PR #12742:
URL: https://github.com/apache/kafka/pull/12742#discussion_r1062606439


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
+     *                              if not specified the default extractor 
defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final 
StoreBuilder<?> storeBuilder,
+                                                                  final String 
sourceName,
+                                                                  final 
TimestampExtractor timestampExtractor,
+                                                                  final 
Deserializer<KIn> keyDeserializer,
+                                                                  final 
Deserializer<VIn> valueDeserializer,
+                                                                  final String 
topic,
+                                                                  final String 
processorName,
+                                                                  final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        if (storeBuilder.loggingEnabled()) {
+            // -- disabling logging. We might want to print some logging.

Review Comment:
   agreed, will change accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to