[ https://issues.apache.org/jira/browse/KAFKA-4344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064379#comment-16064379 ]
Nishkam Ravi commented on KAFKA-4344: ------------------------------------- [~guozhang] We are encountering the same error (for topic()). The code is written in Scala and is being launched using sbt (spring isn't involved). Here's the code sketch: class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], Array[Byte]] with LazyLogging { private var hsmClient: HSMClient = _ override def init(processorContext: ProcessorContext): Unit = { super.init(processorContext) hsmClient = HSMClient(config).getOrElse(null) } override def process(key: Array[Byte], value: Array[Byte]): Unit = { val topic: String = this.context.topic() // exception thrown here val partition: Int = this.context.partition() val offset: Long = this.context.offset() val timestamp: Long = this.context.timestamp() // business logic } } The exception is thrown only for the multi-consumer case (when number of partitions for a topic > 1 and parallelism > 1). This should be easy to reproduce. > Exception when accessing partition, offset and timestamp in processor class > --------------------------------------------------------------------------- > > Key: KAFKA-4344 > URL: https://issues.apache.org/jira/browse/KAFKA-4344 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.0 > Reporter: saiprasad mishra > Assignee: Guozhang Wang > Priority: Minor > > I have a kafka stream pipeline like below > source topic stream -> filter for null value ->map to make it keyed by id > ->custom processor to mystore ->to another topic -> ktable > I am hitting the below type of exception in a custom processor class if I try > to access offset() or partition() or timestamp() from the ProcessorContext in > the process() method. I was hoping it would return the partition and offset > for the enclosing topic(in this case source topic) where its consuming from > or -1 based on the api docs. > java.lang.IllegalStateException: This should not happen as offset() should > only be called while a record is processed > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] > at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] > at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > [kafka-streams-0.10.1.0.jar!/:?] -- This message was sent by Atlassian JIRA (v6.4.14#64029)