Hello Unmesh, Timestamp extractor is always applied at the beginning of the topology for each incoming record, and the extracted timestamp is carried throughout the topology.
Could you share your stack trace upon failure with your source code? And what version of Kafka Streams are you using? Some old version of the library requires the key (as for your case, "String parsedKey = parsedRecord.get("region").toString() + parsedRecord.get("location"). toString();") to be not null, but it has been resolved in the recent fixes. Guozhang On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <unmeshjo...@gmail.com> wrote: > Hi, > > I was trying to experiment with Kafka streams, and had following code > > KTable<Windowed<String>, Integer> aggregated = locationViews > .map((key, value) -> { > GenericRecord parsedRecord = parse(value); > String parsedKey = parsedRecord.get("region").toString() + > parsedRecord.get("location").toString(); > return new KeyValue<>(parsedKey, 1); > }).reduceByKey((v1, v2) -> v1 + v2, > TimeWindows.of("aggregated", 5000), Serdes.String(), > Serdes.Integer()); > > This code fails in RocksDBWindowStore.putInternal, where its trying > to get timestamp from message key. > > I am not producing message with key, so I tried putting > TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > WallclockTimestampExtractor.class.getName()); > > But RocksDBWindowStore, still expects timestamp in the message key and > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > Is it the case that Kafka message should always have a key if we have > to use Windowing? or this is an issue with RocksDBWindowStore? > > > Thanks, > > Unmesh > -- -- Guozhang