Thanks Unmesh for the detailed explanation. When you change your topology's stateful operators -- for example, even if you did not change a non-windowed aggregation to a windowed aggregation, but just change the aggregate / reduce logic -- the underlying state stores as well as their corresponding changelogs are not valid anymore. And users in this case should use a different state store names in their topology builder.
Currently the state store names are abstracted in the high-level DSLs as KTable names and window names. But we are working on exposing the names in the DSL as well like we did in the lower-level processor APIs. Some context of this issue and a proposed API fix can be found here: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65143671 Guozhang On Tue, Jun 21, 2016 at 11:58 PM, Unmesh Joshi <[email protected]> wrote: > A bit more investigation shows that because logging is always enabled in > both RocksDBKeyValueStoreSupplier and RocksDBWindowStoreSupplier, the > aggregated key/values get written to a topic in Kafka. RocksDBWindowStore > always stores keys with timestamp attached. RocksDBStore stores raw keys. > > If the aggregate name remains the same and code is changed to use Windowed > aggregation, it always expects timestamp attached to the key at startup > recovery operation. > > So if aggregate operation is changed from non window to window in the code, > the aggregation logs need to be cleared from Kafka. I deleted the logs from > Kafka nodes and all is working just fine. > > Thanks, > Unmesh > > On Wed, Jun 22, 2016 at 9:29 AM, Unmesh Joshi <[email protected]> > wrote: > > > I could reproduce it with following steps. Adding Stacktrace in the end. > > > > 1. Create a stream and consume it without Windowing. > > > > KTable<String, Integer> aggregation = locationViews > > .map((key, value) -> { > > GenericRecord parsedRecord = parse(value); > > String parsedKey = parsedRecord.get("region").toString() + > parsedRecord.get("location").toString(); > > return new KeyValue<>(parsedKey, 1); > > }).reduceByKey((v1, v2) -> v1 + v2, Serdes.String(), > Serdes.Integer(), "aggregated"); > > > > aggregation.foreach((k, v) -> System.out.println(k + ", " + v)); > > > > > > 2. Stop the consumer and then change the code to add windowing. > > > > KTable<Windowed<String>, Integer> aggregation = locationViews > > .map((key, value) -> { > > GenericRecord parsedRecord = parse(value); > > String parsedKey = parsedRecord.get("region").toString() + > parsedRecord.get("location").toString(); > > return new KeyValue<>(parsedKey, 1); > > }).reduceByKey((v1, v2) -> v1 + v2, TimeWindows.of("aggregated", > 100000), Serdes.String(), Serdes.Integer()); > > > > > > aggregation.foreach((k, v) -> System.out.println(k + ", " + v)); > > > > > > If I already had a consumer without windowing, it throws following > exception when consumer is run as in step 2. > > > > > > java.lang.IndexOutOfBoundsException > > at java.nio.Buffer.checkIndex(Buffer.java:546) > > at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416) > > at > > > org.apache.kafka.streams.state.internals.WindowStoreUtils.timestampFromBinaryKey(WindowStoreUtils.java:63) > > at > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:295) > > at > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:44) > > at > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:190) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) > > at > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184) > > at > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66) > > at > > > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > > at > > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > > at > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > > at > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > > at > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > > at > > > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > > at > > > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > > at > > > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > > at > > > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > > at > > > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > > > > On Wed, Jun 22, 2016 at 1:12 AM, Guozhang Wang <[email protected]> > wrote: > > > >> Could you share your stack trace upon failure? > >> > >> > >> > >> On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi <[email protected]> > >> wrote: > >> > >> > HI, > >> > > >> > I am using confluent-3.0.0-2.11, with kafka and streams versions > >> > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and > >> > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem > >> seems > >> > to be with null keys because the original messages are not produced > with > >> > keys, and I am creating a key value pair in the map function before > >> > aggregating to KTable. The RocksDBWindowStore putInternal is expecting > >> the > >> > timestamp to be appended to the Key, which was not the case. > >> > It somehow corrected itself, once I started producing messages with > some > >> > non null key. > >> > > >> > The code is here > >> > > >> > > >> > https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java > >> > > >> > > >> > Thanks, > >> > Unmesh > >> > > >> > > >> > > >> > > >> > > >> > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang <[email protected]> > >> > wrote: > >> > > >> > > Hello Unmesh, > >> > > > >> > > Timestamp extractor is always applied at the beginning of the > topology > >> > for > >> > > each incoming record, and the extracted timestamp is carried > >> throughout > >> > the > >> > > topology. > >> > > > >> > > Could you share your stack trace upon failure with your source code? > >> And > >> > > what version of Kafka Streams are you using? Some old version of the > >> > > library requires the key (as for your case, "String parsedKey = > >> > > parsedRecord.get("region").toString() + > parsedRecord.get("location"). > >> > > toString();") to be not null, but it has been resolved in the recent > >> > fixes. > >> > > > >> > > Guozhang > >> > > > >> > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi < > [email protected]> > >> > > wrote: > >> > > > >> > > > Hi, > >> > > > > >> > > > I was trying to experiment with Kafka streams, and had following > >> code > >> > > > > >> > > > KTable<Windowed<String>, Integer> aggregated = locationViews > >> > > > .map((key, value) -> { > >> > > > GenericRecord parsedRecord = parse(value); > >> > > > String parsedKey = > >> parsedRecord.get("region").toString() + > >> > > > parsedRecord.get("location").toString(); > >> > > > return new KeyValue<>(parsedKey, 1); > >> > > > }).reduceByKey((v1, v2) -> v1 + v2, > >> > > > TimeWindows.of("aggregated", 5000), Serdes.String(), > >> > > > Serdes.Integer()); > >> > > > > >> > > > This code fails in RocksDBWindowStore.putInternal, where its > trying > >> > > > to get timestamp from message key. > >> > > > > >> > > > I am not producing message with key, so I tried putting > >> > > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > >> > > > > >> > > > > >> > > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > >> > > > WallclockTimestampExtractor.class.getName()); > >> > > > > >> > > > But RocksDBWindowStore, still expects timestamp in the message key > >> and > >> > > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > >> > > > > >> > > > Is it the case that Kafka message should always have a key if we > >> have > >> > > > to use Windowing? or this is an issue with RocksDBWindowStore? > >> > > > > >> > > > > >> > > > Thanks, > >> > > > > >> > > > Unmesh > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > -- -- Guozhang
