A bit more investigation shows that because logging is always enabled in
both RocksDBKeyValueStoreSupplier and RocksDBWindowStoreSupplier, the
aggregated key/values get written to a topic in Kafka. RocksDBWindowStore
always stores keys with timestamp attached. RocksDBStore stores raw keys.

If the aggregate name remains the same and code is changed to use Windowed
aggregation, it always expects timestamp attached to the key at startup
recovery operation.

So if aggregate operation is changed from non window to window in the code,
the aggregation logs need to be cleared from Kafka. I deleted the logs from
Kafka nodes and all is working just fine.

Thanks,
Unmesh

On Wed, Jun 22, 2016 at 9:29 AM, Unmesh Joshi <unmeshjo...@gmail.com> wrote:

> I could reproduce it with following steps.  Adding Stacktrace in the end.
>
> 1. Create a stream and consume it without Windowing.
>
> KTable<String, Integer> aggregation = locationViews
>         .map((key, value) -> {
>             GenericRecord parsedRecord = parse(value);
>             String parsedKey = parsedRecord.get("region").toString() + 
> parsedRecord.get("location").toString();
>             return new KeyValue<>(parsedKey, 1);
>         }).reduceByKey((v1, v2) -> v1 + v2, Serdes.String(), 
> Serdes.Integer(), "aggregated");
>
> aggregation.foreach((k, v) -> System.out.println(k + ", " + v));
>
>
> 2. Stop the consumer and then change the code to add windowing.
>
> KTable<Windowed<String>, Integer> aggregation = locationViews
>         .map((key, value) -> {
>             GenericRecord parsedRecord = parse(value);
>             String parsedKey = parsedRecord.get("region").toString() + 
> parsedRecord.get("location").toString();
>             return new KeyValue<>(parsedKey, 1);
>         }).reduceByKey((v1, v2) -> v1 + v2, TimeWindows.of("aggregated", 
> 100000), Serdes.String(), Serdes.Integer());
>
>
> aggregation.foreach((k, v) -> System.out.println(k + ", " + v));
>
>
> If I already had a consumer without windowing, it throws following exception 
> when consumer is run as in step 2.
>
>
> java.lang.IndexOutOfBoundsException
> at java.nio.Buffer.checkIndex(Buffer.java:546)
> at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416)
> at
> org.apache.kafka.streams.state.internals.WindowStoreUtils.timestampFromBinaryKey(WindowStoreUtils.java:63)
> at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:295)
> at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:44)
> at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:190)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)
> at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184)
> at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>
> On Wed, Jun 22, 2016 at 1:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Could you share your stack trace upon failure?
>>
>>
>>
>> On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi <unmeshjo...@gmail.com>
>> wrote:
>>
>> > HI,
>> >
>> > I am using confluent-3.0.0-2.11, with kafka and streams versions
>> > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and
>> > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem
>> seems
>> > to be with null keys because the original messages are not produced with
>> > keys, and I am creating a key value pair in the map function before
>> > aggregating to KTable. The RocksDBWindowStore putInternal is expecting
>> the
>> > timestamp to be appended to the Key, which was not the case.
>> > It somehow corrected itself, once I started producing messages with some
>> > non null key.
>> >
>> > The code is here
>> >
>> >
>> https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java
>> >
>> >
>> > Thanks,
>> > Unmesh
>> >
>> >
>> >
>> >
>> >
>> > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> >
>> > > Hello Unmesh,
>> > >
>> > > Timestamp extractor is always applied at the beginning of the topology
>> > for
>> > > each incoming record, and the extracted timestamp is carried
>> throughout
>> > the
>> > > topology.
>> > >
>> > > Could you share your stack trace upon failure with your source code?
>> And
>> > > what version of Kafka Streams are you using? Some old version of the
>> > > library requires the key (as for your case, "String parsedKey =
>> > > parsedRecord.get("region").toString() + parsedRecord.get("location").
>> > > toString();") to be not null, but it has been resolved in the recent
>> > fixes.
>> > >
>> > > Guozhang
>> > >
>> > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <unmeshjo...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I was trying to experiment with Kafka streams, and had following
>> code
>> > > >
>> > > > KTable<Windowed<String>, Integer> aggregated = locationViews
>> > > >         .map((key, value) -> {
>> > > >             GenericRecord parsedRecord = parse(value);
>> > > >             String parsedKey =
>> parsedRecord.get("region").toString() +
>> > > > parsedRecord.get("location").toString();
>> > > >             return new KeyValue<>(parsedKey, 1);
>> > > >         }).reduceByKey((v1, v2) -> v1 + v2,
>> > > > TimeWindows.of("aggregated", 5000), Serdes.String(),
>> > > > Serdes.Integer());
>> > > >
>> > > > This code fails in  RocksDBWindowStore.putInternal, where its trying
>> > > > to get timestamp from message key.
>> > > >
>> > > > I am not producing message with key, so I tried putting
>> > > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
>> > > >
>> > > >
>> > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>> > > > WallclockTimestampExtractor.class.getName());
>> > > >
>> > > > But RocksDBWindowStore, still expects timestamp in the message key
>> and
>> > > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
>> > > >
>> > > > Is it the case that Kafka message should always have a key if we
>> have
>> > > > to use Windowing? or this is an issue with RocksDBWindowStore?
>> > > >
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Unmesh
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to