Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344?
On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra <saiprasadmis...@gmail.com > wrote: > Hi > This is with version 10.1.0 kafka streams (server running in remote and > streams app running local in my laptop). > > > > I have a kafka stream pipeline like this > > source topic(with 10 partitions) stream -> filter for null value ->map to > make it keyed by id ->custom processor to mystore(persistent) > > I am getting the below exception. This happens when the flush happens. > If I restart the app the data i sent is actually present in rocksdb store. > I see the message of the keyed stream went to partition 0 on which flush > happened correctly i guess as I see below partition 9 task failed to flush > not sure about the complain about timestamp() here. > > Can somebody explain what does this mean. > > > Not sure if it has something to do with below timestamp extractor property > i am setting or any other time like producer create time ??? > > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > ConsumerRecordTimestampExtractor.class); > > > Regards > Sai > > > 2016-10-25 14:31:29.822000 > org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1 > ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state: > > > org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed > to flush state store Products > > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush( > ProcessorStateManager.java:331) > ~[kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask. > java:275) > ~[kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > StreamThread.java:576) > [kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > StreamThread.java:562) > [kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > StreamThread.java:538) > [kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:456) > [kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > [kafka-streams-0.10.1.0.jar!/:?] > > Caused by: java.lang.IllegalStateException: This should not happen as > timestamp() should only be called while a record is processed > > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl. > timestamp(ProcessorContextImpl.java:192) > ~[kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:112) > ~[kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.state.internals.RocksDBStore. > flush(RocksDBStore.java:375) > ~[kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush( > MeteredKeyValueStore.java:175) > ~[kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush( > ProcessorStateManager.java:329) > ~[kafka-streams-0.10.1.0.jar!/:?] > > ... 6 more > -- -- Guozhang