Hi,
I was trying to experiment with Kafka streams, and had following code
KTable<Windowed<String>, Integer> aggregated = locationViews
.map((key, value) -> {
GenericRecord parsedRecord = parse(value);
String parsedKey = parsedRecord.get("region").toString() +
parsedRecord.get("location").toString();
return new KeyValue<>(parsedKey, 1);
}).reduceByKey((v1, v2) -> v1 + v2,
TimeWindows.of("aggregated", 5000), Serdes.String(),
Serdes.Integer());
This code fails in RocksDBWindowStore.putInternal, where its trying
to get timestamp from message key.
I am not producing message with key, so I tried putting
TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
But RocksDBWindowStore, still expects timestamp in the message key and
does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
Is it the case that Kafka message should always have a key if we have
to use Windowing? or this is an issue with RocksDBWindowStore?
Thanks,
Unmesh