I'm running WordCountProcessorDemo with Processor API. and change something below 1. config 1 stream-thread and 1 replicas 2. change inMemory() to persistent() MyKakfa version is 0.10.0.0. After running streaming application, I check msg output by console-consumer ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-input2 msg1 # by console-producer, we can only produce message's value. so message produce to input topic will use roundrobbin partition msg2 msg3 msg4 msg5 msg6 ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic streams-input2 --property print.key=true --property key.separator=":" --from-beginning null:msg2 # key is null, value is what we produce above null:msg4 null:msg6 null:msg1 null:msg3 null:msg5 ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic streams-output2 --property print.key=true --property key.separator=":" --from-beginning msg2:1 msg1:1 msg1:1 msg3:1 msg2:1 msg4:1 msg1:1 msg3:1 msg5:1 msg2:1 msg4:1 msg6:1 # due to log compaction, same key will be overwrite. this is ok... ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog --property print.key=true --property key.separator=":" --from-beginning *msg2:* *msg4:* *msg6:* *msg1:* *msg3:* *msg5:* Everything is ok, except changelog-topic trouble me. Why it's value is empty?
I have dig into source code, and summary the workflow : 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, no matter Memory or RocksDB, they both put into local storage, then append msg to StoreChangeLogger 2. the key append to StoreChangeLogger first, and invoke maybeLogChange by passing ValueGetter, this getter will get value from local storage when logChange() operation happen 3. when logChange() on StoreChangeLogger happen, send KeyValue message to changelog topic, here is streams-wordcount-Counts-changelog 4. StoreChangeLogger use dirty and remove Set to swap between logChange() and add(). for (K k : this.dirty) { // logChange() method flush dirty to changelog topic V v = getter.get(k); // value getter will get value from local storage collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); } The Only reason I can figure out why value is empty is when invoke KeyValueStore's delete method. But Actualy the application did't do it Anyone help me, Tks .