mjsax commented on code in PR #12742:
URL: https://github.com/apache/kafka/pull/12742#discussion_r1059135143


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.

Review Comment:
   Seems to be too detailed. Also `SourceNode` is an internal class and I think 
we should not refer to it in the JavaDocs.



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.

Review Comment:
   ```suggestion
        * Adds a read-only {@link StateStore} to the topology.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.

Review Comment:
   ```suggestion
        * A read-only StateStore can use any compacted topic as a changelog.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *

Review Comment:
   ```suggestion
        * <p>
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder

Review Comment:
   Why "key value" ?



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
+     *                              if not specified the default extractor 
defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final 
StoreBuilder<?> storeBuilder,
+                                                                  final String 
sourceName,
+                                                                  final 
TimestampExtractor timestampExtractor,
+                                                                  final 
Deserializer<KIn> keyDeserializer,
+                                                                  final 
Deserializer<VIn> valueDeserializer,
+                                                                  final String 
topic,
+                                                                  final String 
processorName,
+                                                                  final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        if (storeBuilder.loggingEnabled()) {
+            // -- disabling logging. We might want to print some logging.

Review Comment:
   For globlaKTable we also just disable logging without logging any warning -- 
guess we could mention it in the JavaDocs.
   
   Also, we don't really need the `if` and can call `withLoggingDisabled()` 
blindly.



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all

Review Comment:
   Similar for `ProcessorNode`



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
+     *                              if not specified the default extractor 
defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */

Review Comment:
   Should we mention anything about `auto.offset.reset` in the JavaDocs? Seems 
we don't allow to pass in a config, and we might want to hard-code it to 
"earliest"? (Cannot remember if/what we discussion on the KIP about it?)



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java:
##########
@@ -59,7 +59,7 @@ public void testKStreamBranch() {
 
         assertEquals(3, branches.length);
 
-        final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockProcessorSupplier<>();

Review Comment:
   Thanks for the cleanup!



##########
streams/src/test/java/org/apache/kafka/streams/TopologyTest.java:
##########
@@ -2303,6 +2304,63 @@ private void 
addGlobalStoreToTopologyAndExpectedDescription(final String globalS
         expectedDescription.addGlobalStore(expectedGlobalStore);
     }
 
+    @Test
+    public void readonlyStateStoresShouldHaveTheirOwnSubTopology() {

Review Comment:
   ```suggestion
       public void readOnlyStateStoresShouldHaveTheirOwnSubTopology() {
   ```



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span 
id="streams-developer-guide-state-store-readonly"></span><h3><a 
class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" 
href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can 
also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to 
this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking 
up reference data managed by another processor
+                    or even another process all together. The messages within 
the compacted topic may be projected (subset of data extracted)
+                    before going into the readonly state store. Therefor we 
can optimize what the data within our readonly state
+                    store should look like, depending on the use-case it needs 
to serve.</p>
+
+                <p><b>note:</b> beware of the partitioning requirements when 
using readonly statestores for lookups during

Review Comment:
   ```suggestion
                   <p><b>note:</b> beware of the partitioning requirements when 
using read-only state stores for lookups during
   ```



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span 
id="streams-developer-guide-state-store-readonly"></span><h3><a 
class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" 
href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can 
also be used to perform lookups against. It is

Review Comment:
   This sentence is somewhat confusing. Maybe we should start with something 
like this:
   
   > A read-only state store materialized the data from its input topic. It 
also uses the input topic for fault-tolerance, and thus does not have an 
additional changelog topic (the input topic is re-used as changelog). Thus, the 
input topic should be configured with log compaction (add link to log 
compaction). Note that no other processor should modify the content of the 
state store, and the only writer should be the associated "state update 
processor"; other processor may read the content of the read-only store.



##########
streams/src/test/java/org/apache/kafka/streams/TopologyTest.java:
##########
@@ -2303,6 +2304,63 @@ private void 
addGlobalStoreToTopologyAndExpectedDescription(final String globalS
         expectedDescription.addGlobalStore(expectedGlobalStore);
     }
 
+    @Test
+    public void readonlyStateStoresShouldHaveTheirOwnSubTopology() {
+        final String sourceName = "source";
+        final String storeName = "store";
+        final String topicName = "topic";
+        final String processorName = "processor";
+
+        final KeyValueStoreBuilder<?, ?> storeBuilder = 
mock(KeyValueStoreBuilder.class);
+        when(storeBuilder.name()).thenReturn(storeName);
+        topology.addReadOnlyStateStore(
+                storeBuilder,
+                sourceName,
+                null,
+                null,
+                null,
+                topicName,
+                processorName,
+                new MockProcessorSupplier<>());
+
+        final TopologyDescription.Source expectedSource = new 
InternalTopologyBuilder.Source(sourceName, Sets.newSet(topicName), null);
+        final TopologyDescription.Processor expectedProcessor = new 
InternalTopologyBuilder.Processor(processorName, Sets.newSet(storeName));
+
+        ((InternalTopologyBuilder.AbstractNode) 
expectedSource).addSuccessor(expectedProcessor);
+        ((InternalTopologyBuilder.AbstractNode) 
expectedProcessor).addPredecessor(expectedSource);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSource);
+        allNodes.add(expectedProcessor);
+        expectedDescription.addSubtopology(new SubtopologyDescription(0, 
allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
+    }
+
+    @Test
+    public void readonlyStateStoresShouldNotLog() {

Review Comment:
   ```suggestion
       public void readOnlyStateStoresShouldNotLog() {
   ```



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span 
id="streams-developer-guide-state-store-readonly"></span><h3><a 
class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" 
href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can 
also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to 
this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking 
up reference data managed by another processor

Review Comment:
   ```suggestion
                   <p>This allows you to use a read-only state store for 
looking up reference data managed by another processor
   ```



##########
streams/src/test/java/org/apache/kafka/test/MockProcessor.java:
##########
@@ -28,9 +29,11 @@
 import java.util.List;
 import java.util.Map;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class MockProcessor<K, V> extends 
org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
-    private final MockApiProcessor<K, V, Object, Object> delegate;
+public class MockProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, 
VIn, KOut, VOut> {
+    private final MockApiProcessor<KIn, VIn, KOut, VOut> delegate;
+
+    private ProcessorContext<KOut, VOut> context;

Review Comment:
   Why do we add this? We inherit `context` from the super class anyway.



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span 
id="streams-developer-guide-state-store-readonly"></span><h3><a 
class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" 
href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can 
also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to 
this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking 
up reference data managed by another processor
+                    or even another process all together. The messages within 
the compacted topic may be projected (subset of data extracted)
+                    before going into the readonly state store. Therefor we 
can optimize what the data within our readonly state
+                    store should look like, depending on the use-case it needs 
to serve.</p>
+
+                <p><b>note:</b> beware of the partitioning requirements when 
using readonly statestores for lookups during
+                    processing. You might want to make sure the original 
changelog topic is co-partitioned with the processors
+                    reading the readonly statestore.</p>

Review Comment:
   This reminds me: should we allow to pass in a custom partitioner? 🤔 (Could 
also be follow up work though...)



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span 
id="streams-developer-guide-state-store-readonly"></span><h3><a 
class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" 
href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can 
also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to 
this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking 
up reference data managed by another processor
+                    or even another process all together. The messages within 
the compacted topic may be projected (subset of data extracted)

Review Comment:
   > may be projected 
   
   I don't think that works? When we restore the state, we would by-pass the 
processor and put the bytes from the input topic into the store as-is...



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span 
id="streams-developer-guide-state-store-readonly"></span><h3><a 
class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" 
href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can 
also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to 
this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking 
up reference data managed by another processor
+                    or even another process all together. The messages within 
the compacted topic may be projected (subset of data extracted)
+                    before going into the readonly state store. Therefor we 
can optimize what the data within our readonly state
+                    store should look like, depending on the use-case it needs 
to serve.</p>
+
+                <p><b>note:</b> beware of the partitioning requirements when 
using readonly statestores for lookups during
+                    processing. You might want to make sure the original 
changelog topic is co-partitioned with the processors
+                    reading the readonly statestore.</p>

Review Comment:
   ```suggestion
                       reading the read-only state store.</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to