[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752150#comment-17752150
 ] 

Matthias J. Sax commented on KAFKA-15297:
-----------------------------------------

Seems we are on the same page :) 

> Cache flush order might not be topological order 
> -------------------------------------------------
>
>                 Key: KAFKA-15297
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15297
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.4.0
>            Reporter: Bruno Cadonna
>            Priority: Major
>         Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
>     .process(
>         () -> new Processor<String, String, String, String>() {
>             private ProcessorContext<String, String> context;
>             @Override
>             public void init(ProcessorContext<String, String> context) {
>                 this.context = context;
>             }
>             @Override
>             public void process(Record<String, String> record) {
>                 context.forward(record);
>             }
>             @Override
>             public void close() {}
>         },
>         Named.as("processor1")
>     )
>     .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
>     streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
>         .toTable(Materialized.<String, String, KeyValueStore<Bytes, 
> byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
>         .mapValues(value -> value, Materialized.<String, String, 
> KeyValueStore<Bytes, 
> byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
>         .mapValues(value -> value, Materialized.<String, String, 
> KeyValueStore<Bytes, 
> byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
>         .toStream()
>         .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
>     final Topology topology = streamsBuilder.build(streamsConfiguration);
>     topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache of {{stateStoreC}} is flushed before the caches of 
> {{stateStoreA}} and {{stateStoreB}}.
> You can observe the flush order by feeding some records into the input topics 
> of the topology, waiting for a commit,  and looking for the following log 
> message:
> https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513
>  
> I changed the log message from trace to debug to avoid too much noise. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to