Yes this is similar meaning it was all about KafkaStreams not started
correctly in my spring app and NOT a bug in KafkaStreams.
Inside the comments in the JIRA I have mentioned what I was doing wrong.

These type of exceptions largely indicate kafka streams was not started
correctly

Thanks for your valuable time on this
Regards
Sai

On Wed, Oct 26, 2016 at 2:34 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344
> ?
>
> On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra <
> saiprasadmis...@gmail.com
> > wrote:
>
> > Hi
> > This is with version 10.1.0 kafka streams (server running in remote and
> > streams app running local in my laptop).
> >
> >
> >
> > I have a kafka stream pipeline like this
> >
> > source topic(with 10 partitions) stream -> filter for null value ->map to
> > make it keyed by id ->custom processor to mystore(persistent)
> >
> > I am getting the below exception. This happens when the flush happens.
> > If I restart the app the data i sent is actually present in rocksdb
> store.
> > I see the message of the keyed stream went to partition 0 on which flush
> > happened correctly i guess as I see below partition 9 task failed to
> flush
> > not sure about the complain about timestamp() here.
> >
> > Can somebody explain what does this mean.
> >
> >
> > Not sure if it has something to do with below timestamp extractor
> property
> > i am setting or any other time like producer create time ???
> >
> >         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > ConsumerRecordTimestampExtractor.class);
> >
> >
> > Regards
> > Sai
> >
> >
> > 2016-10-25 14:31:29.822000
> > org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
> > ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9
> state:
> >
> >
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_9]
> Failed
> > to flush state store Products
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> > ProcessorStateManager.java:331)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.
> > java:275)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:576)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:562)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:538)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:456)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > Caused by: java.lang.IllegalStateException: This should not happen as
> > timestamp() should only be called while a record is processed
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.
> > timestamp(ProcessorContextImpl.java:192)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:112)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> > flush(RocksDBStore.java:375)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> > MeteredKeyValueStore.java:175)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> > ProcessorStateManager.java:329)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > ... 6 more
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to