HI, I am using confluent-3.0.0-2.11, with kafka and streams versions org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem seems to be with null keys because the original messages are not produced with keys, and I am creating a key value pair in the map function before aggregating to KTable. The RocksDBWindowStore putInternal is expecting the timestamp to be appended to the Key, which was not the case. It somehow corrected itself, once I started producing messages with some non null key.
The code is here https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java Thanks, Unmesh On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Unmesh, > > Timestamp extractor is always applied at the beginning of the topology for > each incoming record, and the extracted timestamp is carried throughout the > topology. > > Could you share your stack trace upon failure with your source code? And > what version of Kafka Streams are you using? Some old version of the > library requires the key (as for your case, "String parsedKey = > parsedRecord.get("region").toString() + parsedRecord.get("location"). > toString();") to be not null, but it has been resolved in the recent fixes. > > Guozhang > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <unmeshjo...@gmail.com> > wrote: > > > Hi, > > > > I was trying to experiment with Kafka streams, and had following code > > > > KTable<Windowed<String>, Integer> aggregated = locationViews > > .map((key, value) -> { > > GenericRecord parsedRecord = parse(value); > > String parsedKey = parsedRecord.get("region").toString() + > > parsedRecord.get("location").toString(); > > return new KeyValue<>(parsedKey, 1); > > }).reduceByKey((v1, v2) -> v1 + v2, > > TimeWindows.of("aggregated", 5000), Serdes.String(), > > Serdes.Integer()); > > > > This code fails in RocksDBWindowStore.putInternal, where its trying > > to get timestamp from message key. > > > > I am not producing message with key, so I tried putting > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > > WallclockTimestampExtractor.class.getName()); > > > > But RocksDBWindowStore, still expects timestamp in the message key and > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > Is it the case that Kafka message should always have a key if we have > > to use Windowing? or this is an issue with RocksDBWindowStore? > > > > > > Thanks, > > > > Unmesh > > > > > > -- > -- Guozhang >