[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752150#comment-17752150 ] Matthias J. Sax commented on KAFKA-15297: - Seems we are on the same page :) > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. > You can observe the flush order by feeding some records into the input topics > of the topology, waiting for a commit, and looking for the following log > message: > https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513 > > I changed the log message from trace to debug to avoid too much noise. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752108#comment-17752108 ] Guozhang Wang commented on KAFKA-15297: --- {{We are not sure if we should not instead decouple caching from forwarding.}} I'd assume double negations here mean "We think we should just try to decouple caching from forwarding as the right solution" :) And yes, I'd love to see that happening as I've advocated for it for many years, and I was thinking about just "suppressing" the records in the last sink processor of the sub-topology to achieve the same effect of less send over the network. It may be just similar to what you meant by "caching on the last state store" or may be having some corner differences. In either way like you said it will lose some benefit of processing less records at the later stage of a sub-topology, but I think in most cases given a sub-topology's size this seems a good trade for simplicity. It also have many other benefits, just to name a few: 1) we have much simpler timestamp tracking (today it's as finer-grained as per-processor) with a task as every record will always go through the whole sub-topology, 2) we have simpler version tracking within sub-topologies for IQ since now all state stores have the same version. > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751932#comment-17751932 ] Bruno Cadonna commented on KAFKA-15297: --- [~guozhang] Yes, I agree the issue is when state stores are connected to PAPI operators because they can basically connect to state stores at any location in the topology graph. I also thought about the solution you describe and discussed it with [~mjsax], [~wcarlson5], [~lihaosky], and [~alisa23]. We are not sure if we should not instead decouple caching from forwarding. We would only cache records before changelogging, but we would forward every record. Additionally, we could cache records for forwarding in the last state store(s) of the sub-topology to reduce the number of records output from the subtopology. With this solution we would probably process more records within a sub-topology but send less over the network and we would not need to care about the flush order of the state store caches. > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. > You can observe the flush order by feeding some records into the input topics > of the topology, waiting for a commit, and looking for the following log > message: > https://github.com/apache/kafka/blob/2e1947d240
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751821#comment-17751821 ] Guozhang Wang commented on KAFKA-15297: --- I think this is indeed a general issue, that state stores are initialized in the order of the topology which is essentially the "processor node order", as in ``InternalTopologyBuilder#build``. This works when a state store is only associated with one processors, or when a store is associated with multiple processors but they are built as part of a built-in operator (like a join in DSL) in which case we carefully make sure that state stores order is adherent with the processors order; but in a PAPI scenario like Bruno reported in this one, all bets are off. I think a general fix would be, in the above ``build`` function, we only build the processors in the first loop pass without initializing the state stores, and then based on the built processors order the state stores to be initialized, and do that in another pass. > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. > You can observe the flush order by feeding some records into the input topics > of the topology, waiting for a commit, and looking for the following log > message: > https://github.com/apache/kaf
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750622#comment-17750622 ] Bruno Cadonna commented on KAFKA-15297: --- [~ableegoldman] You can observe the flush order by feeding some records into the input topics and looking for the following log message: https://github.com/apache/kafka/blob/bb48b157af6ca06972927666a5b6f84b9551fe3a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513-L513 I changed the log message from trace to debug to avoid too much noise. I will also update the description with this information. > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750504#comment-17750504 ] Matthias J. Sax commented on KAFKA-15297: - The ticket description contains an example to reproduce it (and there is also a png attachment visualizing the topology). {quote}which in turn *should* reflect the topological order of the attached processor nodes. {quote} That's not always the case unfortunately. > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750495#comment-17750495 ] A. Sophie Blee-Goldman commented on KAFKA-15297: Were you able to (re)produce this issue? I'm a bit surprised because I always thought the state stores were maintained in strict topological order, both when building them initially and then when registering them. The stores are flushed in the order that they are registered, which corresponds to the order they are added to the topology, which in turn *should* reflect the topological order of the attached processor nodes. The original ordering comes from InternalTopologyBuilder#build and the topologically-sorted nodeFactories map. This builds up the stateStoreMap in topological order, no? > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)