ConsoleConsumer by default uses String deserializer, but value in the
changelog is of type long. For output topic, the type in converted from
long to string though -- thus you can read the output topic without
problems.
For reading the changelog topic, you need to specify option
--property
value
the right way to see changelog persistent by rocksdb is use ByteDeser, and
then decode hex to string
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
for(ConsumerRecord record: consumerRecords) {
print bytesToHexString(re
I add some log on StoreChangeLog
for (K k : this.dirty) {
V v = getter.get(k);
log.info("logChange key:{},value:{}", k, v);
collector.send(new ProducerRecord<>(this.topic, this.partition, k,
v), keySerializer, valueSerializer);
}
and found the print result is normal, just some byte:
I'm running WordCountProcessorDemo with Processor API. and change something
below
1. config 1 stream-thread and 1 replicas
2. change inMemory() to persistent()
MyKakfa version is 0.10.0.0. After running streaming application, I check
msg output by console-consumer
➜ kafka_2.10-0.10.0.0 bin/kafka-c