Re: kafka streams changelog topic value

2017-06-07 Thread Matthias J. Sax
ConsoleConsumer by default uses String deserializer, but value in the changelog is of type long. For output topic, the type in converted from long to string though -- thus you can read the output topic without problems. For reading the changelog topic, you need to specify option --property value

Re: kafka streams changelog topic value

2017-06-07 Thread john cheng
the right way to see changelog persistent by rocksdb is use ByteDeser, and then decode hex to string props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); for(ConsumerRecord record: consumerRecords) { print bytesToHexString(re

Re: kafka streams changelog topic value

2017-06-07 Thread john cheng
I add some log on StoreChangeLog for (K k : this.dirty) { V v = getter.get(k); log.info("logChange key:{},value:{}", k, v); collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); } and found the print result is normal, just some byte:

kafka streams changelog topic value

2017-06-07 Thread john cheng
I'm running WordCountProcessorDemo with Processor API. and change something below 1. config 1 stream-thread and 1 replicas 2. change inMemory() to persistent() MyKakfa version is 0.10.0.0. After running streaming application, I check msg output by console-consumer ➜ kafka_2.10-0.10.0.0 bin/kafka-c