Sorry for the email again

I was expecting it to work always when accessed from process() method as
this corresponds to each kafka message/record processing.
I understand illegalstate by the time punctuate() is called as its already
batched by time interval

Regards
Sai

On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra <saiprasadmis...@gmail.com
> wrote:

> Hi
>
> his is with my streaming app kafka 10.1.0.
>
> My flow looks something like below
>
> source topic stream -> filter for null value ->map to make it keyed by id
> ->custom processor to mystore -> to another topic -> ktable
>
> I am hitting the below type of exception in a custom processor class if I
> try to access offset() or partition() or timestamp() from the
> ProcessorContext in the process() method. I was hoping it would return the
> partition and offset for the enclosing topic(in this case source topic)
> where its consuming from or -1 based on the api docs.
>
> Looks like only in certain cases it is accessible. is it getting lost in
> transformation phases.
>
> Same issue happens on if i try to access them in punctuate() method but
> some where I saw that it might not work in punctuate(). Any reason for this
> or any link describing this will be helpful
>
>
> ====================================================================
>
> java.lang.IllegalStateException: This should not happen as offset()
> should only be called while a record is processed
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?]
> =====================================================================
>
>
> Regards
> Sai
>

Reply via email to