I could reproduce it with following steps. Adding Stacktrace in the end. 1. Create a stream and consume it without Windowing.
KTable<String, Integer> aggregation = locationViews .map((key, value) -> { GenericRecord parsedRecord = parse(value); String parsedKey = parsedRecord.get("region").toString() + parsedRecord.get("location").toString(); return new KeyValue<>(parsedKey, 1); }).reduceByKey((v1, v2) -> v1 + v2, Serdes.String(), Serdes.Integer(), "aggregated"); aggregation.foreach((k, v) -> System.out.println(k + ", " + v)); 2. Stop the consumer and then change the code to add windowing. KTable<Windowed<String>, Integer> aggregation = locationViews .map((key, value) -> { GenericRecord parsedRecord = parse(value); String parsedKey = parsedRecord.get("region").toString() + parsedRecord.get("location").toString(); return new KeyValue<>(parsedKey, 1); }).reduceByKey((v1, v2) -> v1 + v2, TimeWindows.of("aggregated", 100000), Serdes.String(), Serdes.Integer()); aggregation.foreach((k, v) -> System.out.println(k + ", " + v)); If I already had a consumer without windowing, it throws following exception when consumer is run as in step 2. java.lang.IndexOutOfBoundsException at java.nio.Buffer.checkIndex(Buffer.java:546) at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416) at org.apache.kafka.streams.state.internals.WindowStoreUtils.timestampFromBinaryKey(WindowStoreUtils.java:63) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:295) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:44) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:190) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) On Wed, Jun 22, 2016 at 1:12 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Could you share your stack trace upon failure? > > > > On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi <unmeshjo...@gmail.com> > wrote: > > > HI, > > > > I am using confluent-3.0.0-2.11, with kafka and streams versions > > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and > > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem > seems > > to be with null keys because the original messages are not produced with > > keys, and I am creating a key value pair in the map function before > > aggregating to KTable. The RocksDBWindowStore putInternal is expecting > the > > timestamp to be appended to the Key, which was not the case. > > It somehow corrected itself, once I started producing messages with some > > non null key. > > > > The code is here > > > > > https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java > > > > > > Thanks, > > Unmesh > > > > > > > > > > > > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > Hello Unmesh, > > > > > > Timestamp extractor is always applied at the beginning of the topology > > for > > > each incoming record, and the extracted timestamp is carried throughout > > the > > > topology. > > > > > > Could you share your stack trace upon failure with your source code? > And > > > what version of Kafka Streams are you using? Some old version of the > > > library requires the key (as for your case, "String parsedKey = > > > parsedRecord.get("region").toString() + parsedRecord.get("location"). > > > toString();") to be not null, but it has been resolved in the recent > > fixes. > > > > > > Guozhang > > > > > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <unmeshjo...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > I was trying to experiment with Kafka streams, and had following code > > > > > > > > KTable<Windowed<String>, Integer> aggregated = locationViews > > > > .map((key, value) -> { > > > > GenericRecord parsedRecord = parse(value); > > > > String parsedKey = parsedRecord.get("region").toString() > + > > > > parsedRecord.get("location").toString(); > > > > return new KeyValue<>(parsedKey, 1); > > > > }).reduceByKey((v1, v2) -> v1 + v2, > > > > TimeWindows.of("aggregated", 5000), Serdes.String(), > > > > Serdes.Integer()); > > > > > > > > This code fails in RocksDBWindowStore.putInternal, where its trying > > > > to get timestamp from message key. > > > > > > > > I am not producing message with key, so I tried putting > > > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > > > > > > > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > > > > WallclockTimestampExtractor.class.getName()); > > > > > > > > But RocksDBWindowStore, still expects timestamp in the message key > and > > > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > > > > > Is it the case that Kafka message should always have a key if we have > > > > to use Windowing? or this is an issue with RocksDBWindowStore? > > > > > > > > > > > > Thanks, > > > > > > > > Unmesh > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >