the right way to see changelog persistent by rocksdb is use ByteDeser, and
then decode hex to string
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
for(ConsumerRecord<String, byte[]> record: consumerRecords) {
print bytesToHexString(record.value())
}
public static String bytesToHexString(byte[] src){
StringBuilder stringBuilder = new StringBuilder("");
if (src == null || src.length <= 0) {
return null;
}
for (int i = 0; i < src.length; i++) {
int v = src[i] & 0xFF;
String hv = Integer.toHexString(v);
if (hv.length() < 2) {
stringBuilder.append(0);
}
stringBuilder.append(hv);
}
return stringBuilder.toString();
}
output now collect
p:0,o:0,k:msg1,v:00000001
p:0,o:1,k:msg3,v:00000001
p:0,o:2,k:msg5,v:00000001
p:0,o:3,k:msg1,v:00000002
p:0,o:4,k:msg3,v:00000002
p:1,o:0,k:msg2,v:00000001
p:1,o:1,k:msg4,v:00000001
p:1,o:2,k:msg2,v:00000002
p:1,o:3,k:msg2,v:00000003
2017-06-07 18:42 GMT+08:00 john cheng <[email protected]>:
> I add some log on StoreChangeLog
>
> for (K k : this.dirty) {
> V v = getter.get(k);
> log.info("logChange key:{},value:{}", k, v);
> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v),
> keySerializer, valueSerializer);
> }
>
> and found the print result is normal, just some byte:
>
> [2017-06-07 18:39:43,131] INFO logChange removed:[], dirty:[kafka]
> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
> [2017-06-07 18:39:43,132] INFO logChange key:kafka,value:[0, 0, 0, 2]
> (org.apache.kafka.streams.state.internals.StoreChangeLogger)
>
> but console-consumer still output kafka:
>
> so this may be problem of console-consume.
>
>
> 2017-06-07 18:24 GMT+08:00 john cheng <[email protected]>:
>
>> I'm running WordCountProcessorDemo with Processor API. and change
>> something below
>> 1. config 1 stream-thread and 1 replicas
>> 2. change inMemory() to persistent()
>> MyKakfa version is 0.10.0.0. After running streaming application, I check
>> msg output by console-consumer
>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-producer.sh --broker-list
>> localhost:9092 --topic streams-input2
>> msg1 # by console-producer, we can only produce message's value. so
>> message produce to input topic will use roundrobbin partition
>> msg2
>> msg3
>> msg4
>> msg5
>> msg6
>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-input2 --property
>> print.key=true --property key.separator=":" --from-beginning
>> null:msg2 # key is null, value is what we produce above
>> null:msg4
>> null:msg6
>> null:msg1
>> null:msg3
>> null:msg5
>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-output2 --property
>> print.key=true --property key.separator=":" --from-beginning
>> msg2:1
>> msg1:1
>> msg1:1
>> msg3:1
>> msg2:1
>> msg4:1
>> msg1:1
>> msg3:1
>> msg5:1
>> msg2:1
>> msg4:1
>> msg6:1 # due to log compaction, same key will be overwrite. this is ok...
>> ➜ kafka_2.10-0.10.0.0 bin/kafka-console-consumer.sh --new-consumer
>> --bootstrap-server localhost:9092 --topic streams-wordcount-Counts-changelog
>> --property print.key=true --property key.separator=":" --from-beginning
>> *msg2:*
>> *msg4:*
>> *msg6:*
>> *msg1:*
>> *msg3:*
>> *msg5:*
>> Everything is ok, except changelog-topic trouble me. Why it's value is
>> empty?
>>
>> I have dig into source code, and summary the workflow :
>> 1. when WordCountProcessorDemo's ProcessorNode put kv to KeyValueStore,
>> no matter Memory or RocksDB, they both put into local storage, then append
>> msg to StoreChangeLogger
>> 2. the key append to StoreChangeLogger first, and invoke maybeLogChange
>> by passing ValueGetter, this getter will get value from local storage when
>> logChange() operation happen
>> 3. when logChange() on StoreChangeLogger happen, send KeyValue message to
>> changelog topic, here is streams-wordcount-Counts-changelog
>> 4. StoreChangeLogger use dirty and remove Set to swap between logChange()
>> and add().
>>
>> for (K k : this.dirty) { // logChange() method flush dirty to changelog topic
>> V v = getter.get(k); // value getter will get value from local storage
>> collector.send(new ProducerRecord<>(this.topic, this.partition, k, v),
>> keySerializer, valueSerializer);
>> }
>>
>>
>> The Only reason I can figure out why value is empty is when invoke
>> KeyValueStore's delete method. But Actualy the application did't do it
>>
>> Anyone help me, Tks .
>>
>>
>