Hello Unmesh,

Timestamp extractor is always applied at the beginning of the topology for
each incoming record, and the extracted timestamp is carried throughout the
topology.

Could you share your stack trace upon failure with your source code? And
what version of Kafka Streams are you using? Some old version of the
library requires the key (as for your case, "String parsedKey =
parsedRecord.get("region").toString() + parsedRecord.get("location").
toString();") to be not null, but it has been resolved in the recent fixes.

Guozhang

On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <unmeshjo...@gmail.com> wrote:

> Hi,
>
> I was trying to experiment with Kafka streams, and had following code
>
> KTable<Windowed<String>, Integer> aggregated = locationViews
>         .map((key, value) -> {
>             GenericRecord parsedRecord = parse(value);
>             String parsedKey = parsedRecord.get("region").toString() +
> parsedRecord.get("location").toString();
>             return new KeyValue<>(parsedKey, 1);
>         }).reduceByKey((v1, v2) -> v1 + v2,
> TimeWindows.of("aggregated", 5000), Serdes.String(),
> Serdes.Integer());
>
> This code fails in  RocksDBWindowStore.putInternal, where its trying
> to get timestamp from message key.
>
> I am not producing message with key, so I tried putting
> TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
>
> streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class.getName());
>
> But RocksDBWindowStore, still expects timestamp in the message key and
> does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
>
> Is it the case that Kafka message should always have a key if we have
> to use Windowing? or this is an issue with RocksDBWindowStore?
>
>
> Thanks,
>
> Unmesh
>



-- 
-- Guozhang

Reply via email to