I'm running WordCountProcessorDemo with Processor API. and change something
below
1. config 1 stream-thread and 1 replicas
2. change inMemory() to persistent()
MyKakfa version is 0.10.0.0. After running streaming application, I check
msg output by console-consumer
➜  kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic streams-input2
msg1  # by console-producer, we can only produce message's value. so
message produce to input topic will use roundrobbin partition
msg2
msg3
msg4
msg5
msg6
➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
--bootstrap-server localhost:9092 --topic streams-input2 --property
print.key=true --property key.separator=":" --from-beginning
null:msg2  # key is null, value is what we produce above
null:msg4
null:msg6
null:msg1
null:msg3
null:msg5
➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
--bootstrap-server localhost:9092 --topic streams-output2 --property
print.key=true --property key.separator=":" --from-beginning
msg2:1
msg1:1
msg1:1
msg3:1
msg2:1
msg4:1
msg1:1
msg3:1
msg5:1
msg2:1
msg4:1
msg6:1  # due to log compaction, same key will be overwrite. this is ok...
➜  kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
--bootstrap-server localhost:9092 --topic
streams-wordcount-Counts-changelog --property print.key=true --property
key.separator=":" --from-beginning
*msg2:*
*msg4:*
*msg6:*
*msg1:*
*msg3:*
*msg5:*
Everything is ok, except changelog-topic trouble me. Why it's value is
empty?

I have dig into source code, and summary the workflow :
1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore, no
matter Memory or RocksDB, they both put into local storage, then append msg
to StoreChangeLogger
2. the key append to StoreChangeLogger first, and invoke maybeLogChange by
passing ValueGetter, this getter will get value from local storage when
logChange() operation happen
3. when logChange() on StoreChangeLogger happen, send KeyValue message to
changelog topic, here is  streams-wordcount-Counts-changelog
4. StoreChangeLogger use dirty and remove Set to swap between logChange()
and add().

for (K k : this.dirty) { // logChange() method flush dirty to changelog topic
    V v = getter.get(k);  // value getter will get value from local storage
    collector.send(new ProducerRecord<>(this.topic, this.partition, k,
v), keySerializer, valueSerializer);
}


The Only reason I can figure out why value is empty is when invoke
KeyValueStore's delete method. But Actualy the application did't do it

Anyone help me, Tks .

Reply via email to