Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344?

On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra <saiprasadmis...@gmail.com
> wrote:

> Hi
> This is with version 10.1.0 kafka streams (server running in remote and
> streams app running local in my laptop).
>
>
>
> I have a kafka stream pipeline like this
>
> source topic(with 10 partitions) stream -> filter for null value ->map to
> make it keyed by id ->custom processor to mystore(persistent)
>
> I am getting the below exception. This happens when the flush happens.
> If I restart the app the data i sent is actually present in rocksdb store.
> I see the message of the keyed stream went to partition 0 on which flush
> happened correctly i guess as I see below partition 9 task failed to flush
> not sure about the complain about timestamp() here.
>
> Can somebody explain what does this mean.
>
>
> Not sure if it has something to do with below timestamp extractor property
> i am setting or any other time like producer create time ???
>
>         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> ConsumerRecordTimestampExtractor.class);
>
>
> Regards
> Sai
>
>
> 2016-10-25 14:31:29.822000
> org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
> ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state:
>
>
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed
> to flush state store Products
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
> ProcessorStateManager.java:331)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:275)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:576)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:562)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:538)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:456)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> Caused by: java.lang.IllegalStateException: This should not happen as
> timestamp() should only be called while a record is processed
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.
> timestamp(ProcessorContextImpl.java:192)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:112)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> flush(RocksDBStore.java:375)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> MeteredKeyValueStore.java:175)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
> ProcessorStateManager.java:329)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> ... 6 more
>



-- 
-- Guozhang

Reply via email to