I could reproduce it with following steps.  Adding Stacktrace in the end.

1. Create a stream and consume it without Windowing.

KTable<String, Integer> aggregation = locationViews
        .map((key, value) -> {
            GenericRecord parsedRecord = parse(value);
            String parsedKey = parsedRecord.get("region").toString() +
parsedRecord.get("location").toString();
            return new KeyValue<>(parsedKey, 1);
        }).reduceByKey((v1, v2) -> v1 + v2, Serdes.String(),
Serdes.Integer(), "aggregated");

aggregation.foreach((k, v) -> System.out.println(k + ", " + v));


2. Stop the consumer and then change the code to add windowing.

KTable<Windowed<String>, Integer> aggregation = locationViews
        .map((key, value) -> {
            GenericRecord parsedRecord = parse(value);
            String parsedKey = parsedRecord.get("region").toString() +
parsedRecord.get("location").toString();
            return new KeyValue<>(parsedKey, 1);
        }).reduceByKey((v1, v2) -> v1 + v2,
TimeWindows.of("aggregated", 100000), Serdes.String(),
Serdes.Integer());


aggregation.foreach((k, v) -> System.out.println(k + ", " + v));


If I already had a consumer without windowing, it throws following
exception when consumer is run as in step 2.


java.lang.IndexOutOfBoundsException
at java.nio.Buffer.checkIndex(Buffer.java:546)
at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416)
at
org.apache.kafka.streams.state.internals.WindowStoreUtils.timestampFromBinaryKey(WindowStoreUtils.java:63)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:295)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:44)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:190)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184)
at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)

On Wed, Jun 22, 2016 at 1:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Could you share your stack trace upon failure?
>
>
>
> On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi <unmeshjo...@gmail.com>
> wrote:
>
> > HI,
> >
> > I am using confluent-3.0.0-2.11, with kafka and streams versions
> > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and
> > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem
> seems
> > to be with null keys because the original messages are not produced with
> > keys, and I am creating a key value pair in the map function before
> > aggregating to KTable. The RocksDBWindowStore putInternal is expecting
> the
> > timestamp to be appended to the Key, which was not the case.
> > It somehow corrected itself, once I started producing messages with some
> > non null key.
> >
> > The code is here
> >
> >
> https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java
> >
> >
> > Thanks,
> > Unmesh
> >
> >
> >
> >
> >
> > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Hello Unmesh,
> > >
> > > Timestamp extractor is always applied at the beginning of the topology
> > for
> > > each incoming record, and the extracted timestamp is carried throughout
> > the
> > > topology.
> > >
> > > Could you share your stack trace upon failure with your source code?
> And
> > > what version of Kafka Streams are you using? Some old version of the
> > > library requires the key (as for your case, "String parsedKey =
> > > parsedRecord.get("region").toString() + parsedRecord.get("location").
> > > toString();") to be not null, but it has been resolved in the recent
> > fixes.
> > >
> > > Guozhang
> > >
> > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <unmeshjo...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I was trying to experiment with Kafka streams, and had following code
> > > >
> > > > KTable<Windowed<String>, Integer> aggregated = locationViews
> > > >         .map((key, value) -> {
> > > >             GenericRecord parsedRecord = parse(value);
> > > >             String parsedKey = parsedRecord.get("region").toString()
> +
> > > > parsedRecord.get("location").toString();
> > > >             return new KeyValue<>(parsedKey, 1);
> > > >         }).reduceByKey((v1, v2) -> v1 + v2,
> > > > TimeWindows.of("aggregated", 5000), Serdes.String(),
> > > > Serdes.Integer());
> > > >
> > > > This code fails in  RocksDBWindowStore.putInternal, where its trying
> > > > to get timestamp from message key.
> > > >
> > > > I am not producing message with key, so I tried putting
> > > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> > > >
> > > >
> > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > > > WallclockTimestampExtractor.class.getName());
> > > >
> > > > But RocksDBWindowStore, still expects timestamp in the message key
> and
> > > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> > > >
> > > > Is it the case that Kafka message should always have a key if we have
> > > > to use Windowing? or this is an issue with RocksDBWindowStore?
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Unmesh
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to