[
https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15609837#comment-15609837
]
saiprasad mishra commented on KAFKA-4344:
-----------------------------------------
Yes it is fixed now. Will write to the mailing list
Thanks for creating the great library for stateful stream processing
Regards
Sai
> Exception when accessing partition, offset and timestamp in processor class
> ---------------------------------------------------------------------------
>
> Key: KAFKA-4344
> URL: https://issues.apache.org/jira/browse/KAFKA-4344
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.1.0
> Reporter: saiprasad mishra
> Assignee: Guozhang Wang
> Priority: Minor
>
> I have a kafka stream pipeline like below
> source topic stream -> filter for null value ->map to make it keyed by id
> ->custom processor to mystore ->to another topic -> ktable
> I am hitting the below type of exception in a custom processor class if I try
> to access offset() or partition() or timestamp() from the ProcessorContext in
> the process() method. I was hoping it would return the partition and offset
> for the enclosing topic(in this case source topic) where its consuming from
> or -1 based on the api docs.
> java.lang.IllegalStateException: This should not happen as offset() should
> only be called while a record is processed
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> [kafka-streams-0.10.1.0.jar!/:?]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)