Re: Exception when accessing partition, offset and timestamp in processor class
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Thx! :) On 10/25/16 2:03 PM, saiprasad mishra wrote: > Just created the JIRA > > https://issues.apache.org/jira/browse/KAFKA-4344 > > Regards Sai > > On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra < > saiprasadmis...@gmail.com> wrote: > >> My JIRA id is saimishra >> >> Regards Sai >> >> On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax >> wrote: >> > What is your JIRA ID? We can add you to the contributor list to > give you permission. > > -Matthias > > > On 10/25/16 10:48 AM, saiprasad mishra wrote: > Hi Matthias Thanks for the reply. I think I don't have > permission for this. If you can grant me permission I can > create one (my handle is saimishra). Or you can go ahead > and create one > > I may need permission to create JIRA as I might report more > issues after discussing with you over here. > > Regards Sai > > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax > wrote: > > Hi, > > sorry for late reply. Seems like a bug to me; within > Processor#process() accessing the context should work. Can > you open a JIRA for it? > > -Matthias > > On 10/23/16 10:28 PM, saiprasad mishra wrote: Sorry for the email again I was expecting it to work always when accessed from process() method as this corresponds to each kafka message/record processing. I understand illegalstate by the time punctuate() is called as its already batched by time interval Regards Sai On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra wrote: > Hi > > his is with my streaming app kafka 10.1.0. > > My flow looks something like below > > source topic stream -> filter for null value ->map > to make it keyed by id ->custom processor to > mystore -> to another topic -> ktable > > I am hitting the below type of exception in a > custom processor class if I try to access offset() > or partition() or timestamp() from the > ProcessorContext in the process() method. I was > hoping it would return the partition and offset for > the enclosing topic(in this case source topic) > where its consuming from or -1 based on the api > docs. > > Looks like only in certain cases it is accessible. > is it getting lost in transformation phases. > > Same issue happens on if i try to access them in > punctuate() method but some where I saw that it > might not work in punctuate(). Any reason for this > or any link describing this will be helpful > > > == > > == > > > > > > > java.lang.IllegalStateException: This should not happen as > offset() > should only be called while a record is processed > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > com.sai.repo.MyStore.process(MyStore.java:72) > ~[classes!/:?] at > com.sai.repo.MyStore.process(MyStore.java:39) > ~[classes!/:?] at > org.apache.kafka.streams.processor.internals.ProcessorNode.pro cess > > (Pr > > > ocessorNode.java:82) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.kstream.internals.KStreamMap$ > > KStreamMapProcessor.process(KStreamMap.java:43) > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.processor.internals.ProcessorNode.pro cess > > (Pr > > > ocessorNode.java:82) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > > KStreamFilterProcessor.process(KStreamFilter.java:44) > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.processor.internals.ProcessorNode.pro cess > > (Pr > > > ocessorNode.java:82) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forw
Re: Exception when accessing partition, offset and timestamp in processor class
Just created the JIRA https://issues.apache.org/jira/browse/KAFKA-4344 Regards Sai On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra < saiprasadmis...@gmail.com> wrote: > My JIRA id is saimishra > > Regards > Sai > > On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax > wrote: > >> -BEGIN PGP SIGNED MESSAGE- >> Hash: SHA512 >> >> What is your JIRA ID? We can add you to the contributor list to give >> you permission. >> >> - -Matthias >> >> >> On 10/25/16 10:48 AM, saiprasad mishra wrote: >> > Hi Matthias Thanks for the reply. I think I don't have permission >> > for this. If you can grant me permission I can create one (my >> > handle is saimishra). Or you can go ahead and create one >> > >> > I may need permission to create JIRA as I might report more issues >> > after discussing with you over here. >> > >> > Regards Sai >> > >> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax >> > wrote: >> > >> > Hi, >> > >> > sorry for late reply. Seems like a bug to me; within >> > Processor#process() accessing the context should work. Can you open >> > a JIRA for it? >> > >> > -Matthias >> > >> > On 10/23/16 10:28 PM, saiprasad mishra wrote: >> Sorry for the email again >> >> I was expecting it to work always when accessed from >> process() method as this corresponds to each kafka >> message/record processing. I understand illegalstate by the >> time punctuate() is called as its already batched by time >> interval >> >> Regards Sai >> >> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra >> > > wrote: >> >> > Hi >> > >> > his is with my streaming app kafka 10.1.0. >> > >> > My flow looks something like below >> > >> > source topic stream -> filter for null value ->map to make >> > it keyed by id ->custom processor to mystore -> to another >> > topic -> ktable >> > >> > I am hitting the below type of exception in a custom >> > processor class if I try to access offset() or partition() >> > or timestamp() from the ProcessorContext in the process() >> > method. I was hoping it would return the partition and >> > offset for the enclosing topic(in this case source topic) >> > where its consuming from or -1 based on the api docs. >> > >> > Looks like only in certain cases it is accessible. is it >> > getting lost in transformation phases. >> > >> > Same issue happens on if i try to access them in >> > punctuate() method but some where I saw that it might not >> > work in punctuate(). Any reason for this or any link >> > describing this will be helpful >> > >> > >> > == >> == >> > >> > >> > >> > >> > >> > >> java.lang.IllegalStateException: This should not happen as offset() >> > should only be called while a record is processed at >> > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.offset(ProcessorContextImpl.java:181) >> > ~[kafka-streams-0.10.1.0.jar!/:?] at >> > com.sai.repo.MyStore.process(MyStore.java:72) >> > ~[classes!/:?] at >> > com.sai.repo.MyStore.process(MyStore.java:39) >> > ~[classes!/:?] at >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process >> (Pr >> > >> > >> ocessorNode.java:82) >> > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] >> > at org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > >> > >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> > org.apache.kafka.streams.kstream.internals.KStreamMap$ >> > KStreamMapProcessor.process(KStreamMap.java:43) >> > ~[kafka-streams-0.10.1.0.jar!/:?] at >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process >> (Pr >> > >> > >> ocessorNode.java:82) >> > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] >> > at org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > >> > >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> > org.apache.kafka.streams.kstream.internals.KStreamFilter$ >> > KStreamFilterProcessor.process(KStreamFilter.java:44) >> > ~[kafka-streams-0.10.1.0.jar!/:?] at >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process >> (Pr >> > >> > >> ocessorNode.java:82) >> > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] >> > at org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > >> > >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> > org.apache.kafka.streams.processor.internals. >> > SourceNode.process(SourceNode.java:66) >> > ~[kafka-streams-0.10.1.0.jar!/:?] at >> > org.apache.kafka.streams.processor.internals. >> > StreamTask.process(StreamTask.java:181) >> > ~[k
Re: Exception when accessing partition, offset and timestamp in processor class
My JIRA id is saimishra Regards Sai On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > What is your JIRA ID? We can add you to the contributor list to give > you permission. > > - -Matthias > > > On 10/25/16 10:48 AM, saiprasad mishra wrote: > > Hi Matthias Thanks for the reply. I think I don't have permission > > for this. If you can grant me permission I can create one (my > > handle is saimishra). Or you can go ahead and create one > > > > I may need permission to create JIRA as I might report more issues > > after discussing with you over here. > > > > Regards Sai > > > > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax > > wrote: > > > > Hi, > > > > sorry for late reply. Seems like a bug to me; within > > Processor#process() accessing the context should work. Can you open > > a JIRA for it? > > > > -Matthias > > > > On 10/23/16 10:28 PM, saiprasad mishra wrote: > Sorry for the email again > > I was expecting it to work always when accessed from > process() method as this corresponds to each kafka > message/record processing. I understand illegalstate by the > time punctuate() is called as its already batched by time > interval > > Regards Sai > > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra > > wrote: > > > Hi > > > > his is with my streaming app kafka 10.1.0. > > > > My flow looks something like below > > > > source topic stream -> filter for null value ->map to make > > it keyed by id ->custom processor to mystore -> to another > > topic -> ktable > > > > I am hitting the below type of exception in a custom > > processor class if I try to access offset() or partition() > > or timestamp() from the ProcessorContext in the process() > > method. I was hoping it would return the partition and > > offset for the enclosing topic(in this case source topic) > > where its consuming from or -1 based on the api docs. > > > > Looks like only in certain cases it is accessible. is it > > getting lost in transformation phases. > > > > Same issue happens on if i try to access them in > > punctuate() method but some where I saw that it might not > > work in punctuate(). Any reason for this or any link > > describing this will be helpful > > > > > > == > == > > > > > > > > > > > > > java.lang.IllegalStateException: This should not happen as offset() > > should only be called while a record is processed at > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > > com.sai.repo.MyStore.process(MyStore.java:72) > > ~[classes!/:?] at > > com.sai.repo.MyStore.process(MyStore.java:39) > > ~[classes!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process > (Pr > > > > > ocessorNode.java:82) > > > > > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > > at org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > > > ~[kafka-streams-0.10.1.0.jar!/:?] at > > org.apache.kafka.streams.kstream.internals.KStreamMap$ > > KStreamMapProcessor.process(KStreamMap.java:43) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process > (Pr > > > > > ocessorNode.java:82) > > > > > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > > at org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > > > ~[kafka-streams-0.10.1.0.jar!/:?] at > > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > > KStreamFilterProcessor.process(KStreamFilter.java:44) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process > (Pr > > > > > ocessorNode.java:82) > > > > > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > > at org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > > > ~[kafka-streams-0.10.1.0.jar!/:?] at > > org.apache.kafka.streams.processor.internals. > > SourceNode.process(SourceNode.java:66) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > > org.apache.kafka.streams.processor.internals. > > StreamTask.process(StreamTask.java:181) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > Str > > > > > eamThread.java:436) > > > > > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > > at org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > [kafk
Re: Exception when accessing partition, offset and timestamp in processor class
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 What is your JIRA ID? We can add you to the contributor list to give you permission. - -Matthias On 10/25/16 10:48 AM, saiprasad mishra wrote: > Hi Matthias Thanks for the reply. I think I don't have permission > for this. If you can grant me permission I can create one (my > handle is saimishra). Or you can go ahead and create one > > I may need permission to create JIRA as I might report more issues > after discussing with you over here. > > Regards Sai > > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax > wrote: > > Hi, > > sorry for late reply. Seems like a bug to me; within > Processor#process() accessing the context should work. Can you open > a JIRA for it? > > -Matthias > > On 10/23/16 10:28 PM, saiprasad mishra wrote: Sorry for the email again I was expecting it to work always when accessed from process() method as this corresponds to each kafka message/record processing. I understand illegalstate by the time punctuate() is called as its already batched by time interval Regards Sai On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra wrote: > Hi > > his is with my streaming app kafka 10.1.0. > > My flow looks something like below > > source topic stream -> filter for null value ->map to make > it keyed by id ->custom processor to mystore -> to another > topic -> ktable > > I am hitting the below type of exception in a custom > processor class if I try to access offset() or partition() > or timestamp() from the ProcessorContext in the process() > method. I was hoping it would return the partition and > offset for the enclosing topic(in this case source topic) > where its consuming from or -1 based on the api docs. > > Looks like only in certain cases it is accessible. is it > getting lost in transformation phases. > > Same issue happens on if i try to access them in > punctuate() method but some where I saw that it might not > work in punctuate(). Any reason for this or any link > describing this will be helpful > > > == == > > > > > > java.lang.IllegalStateException: This should not happen as offset() > should only be called while a record is processed at > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] at > com.sai.repo.MyStore.process(MyStore.java:72) > ~[classes!/:?] at > com.sai.repo.MyStore.process(MyStore.java:39) > ~[classes!/:?] at > org.apache.kafka.streams.processor.internals.ProcessorNode.process (Pr > > ocessorNode.java:82) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.kstream.internals.KStreamMap$ > KStreamMapProcessor.process(KStreamMap.java:43) > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.processor.internals.ProcessorNode.process (Pr > > ocessorNode.java:82) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.kstream.internals.KStreamFilter$ > KStreamFilterProcessor.process(KStreamFilter.java:44) > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.processor.internals.ProcessorNode.process (Pr > > ocessorNode.java:82) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( Str > > eamThread.java:436) > > > > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > [kafka-streams-0.10.1.0.jar!/:?] > == === > > > > > > > Regards > Sai > >> > -BEGIN PGP SIGNATURE- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYD5yXAAoJECnhiMLycopP+Y4QAK8vkvLauOiJQrb2b56bwZIG 7uSMnCagjGYnpMnpI
Re: Exception when accessing partition, offset and timestamp in processor class
Hi Matthias Thanks for the reply. I think I don't have permission for this. If you can grant me permission I can create one (my handle is saimishra). Or you can go ahead and create one I may need permission to create JIRA as I might report more issues after discussing with you over here. Regards Sai On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > Hi, > > sorry for late reply. Seems like a bug to me; within > Processor#process() accessing the context should work. Can you open a > JIRA for it? > > - -Matthias > > On 10/23/16 10:28 PM, saiprasad mishra wrote: > > Sorry for the email again > > > > I was expecting it to work always when accessed from process() > > method as this corresponds to each kafka message/record processing. > > I understand illegalstate by the time punctuate() is called as its > > already batched by time interval > > > > Regards Sai > > > > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra > > >> wrote: > > > >> Hi > >> > >> his is with my streaming app kafka 10.1.0. > >> > >> My flow looks something like below > >> > >> source topic stream -> filter for null value ->map to make it > >> keyed by id ->custom processor to mystore -> to another topic -> > >> ktable > >> > >> I am hitting the below type of exception in a custom processor > >> class if I try to access offset() or partition() or timestamp() > >> from the ProcessorContext in the process() method. I was hoping > >> it would return the partition and offset for the enclosing > >> topic(in this case source topic) where its consuming from or -1 > >> based on the api docs. > >> > >> Looks like only in certain cases it is accessible. is it getting > >> lost in transformation phases. > >> > >> Same issue happens on if i try to access them in punctuate() > >> method but some where I saw that it might not work in > >> punctuate(). Any reason for this or any link describing this > >> will be helpful > >> > >> > >> > >> > >> > >> > >> > java.lang.IllegalStateException: This should not happen as offset() > >> should only be called while a record is processed at > >> org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at > >> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.kstream.internals.KStreamMap$ > >> KStreamMapProcessor.process(KStreamMap.java:43) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.kstream.internals.KStreamFilter$ > >> KStreamFilterProcessor.process(KStreamFilter.java:44) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals. > >> SourceNode.process(SourceNode.java:66) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals. > >> StreamTask.process(StreamTask.java:181) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str > eamThread.java:436) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:242) > >> [kafka-streams-0.10.1.0.jar!/:?] > >> = > >> > >> > >> > >> > >> > Regards > >> Sai > >> > > > -BEGIN PGP SIGNATURE- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM > ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG > AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8 > DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S > udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw > Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp > 5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp > cM9w2Bo8Wxo9qPz0
Re: Exception when accessing partition, offset and timestamp in processor class
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Hi, sorry for late reply. Seems like a bug to me; within Processor#process() accessing the context should work. Can you open a JIRA for it? - -Matthias On 10/23/16 10:28 PM, saiprasad mishra wrote: > Sorry for the email again > > I was expecting it to work always when accessed from process() > method as this corresponds to each kafka message/record processing. > I understand illegalstate by the time punctuate() is called as its > already batched by time interval > > Regards Sai > > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra > > wrote: > >> Hi >> >> his is with my streaming app kafka 10.1.0. >> >> My flow looks something like below >> >> source topic stream -> filter for null value ->map to make it >> keyed by id ->custom processor to mystore -> to another topic -> >> ktable >> >> I am hitting the below type of exception in a custom processor >> class if I try to access offset() or partition() or timestamp() >> from the ProcessorContext in the process() method. I was hoping >> it would return the partition and offset for the enclosing >> topic(in this case source topic) where its consuming from or -1 >> based on the api docs. >> >> Looks like only in certain cases it is accessible. is it getting >> lost in transformation phases. >> >> Same issue happens on if i try to access them in punctuate() >> method but some where I saw that it might not work in >> punctuate(). Any reason for this or any link describing this >> will be helpful >> >> >> >> >> >> >> java.lang.IllegalStateException: This should not happen as offset() >> should only be called while a record is processed at >> org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at >> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr ocessorNode.java:82) >> >> >> ~[kafka-streams-0.10.1.0.jar!/:?] >> at org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> org.apache.kafka.streams.kstream.internals.KStreamMap$ >> KStreamMapProcessor.process(KStreamMap.java:43) >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr ocessorNode.java:82) >> >> >> ~[kafka-streams-0.10.1.0.jar!/:?] >> at org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> org.apache.kafka.streams.kstream.internals.KStreamFilter$ >> KStreamFilterProcessor.process(KStreamFilter.java:44) >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr ocessorNode.java:82) >> >> >> ~[kafka-streams-0.10.1.0.jar!/:?] >> at org.apache.kafka.streams.processor.internals. >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> org.apache.kafka.streams.processor.internals. >> SourceNode.process(SourceNode.java:66) >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> org.apache.kafka.streams.processor.internals. >> StreamTask.process(StreamTask.java:181) >> ~[kafka-streams-0.10.1.0.jar!/:?] at >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str eamThread.java:436) >> >> >> ~[kafka-streams-0.10.1.0.jar!/:?] >> at org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:242) >> [kafka-streams-0.10.1.0.jar!/:?] >> = >> >> >> >> >> Regards >> Sai >> > -BEGIN PGP SIGNATURE- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8 DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp 5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp cM9w2Bo8Wxo9qPz0fAnaYTTNt/J4h2RkycIcFTY2xvBVfmjJZwq9XVVwIXkIDnxN sTxM6Czy4L7bcP+y6B/tqOG96cIJ5czKZwD7qwEOM9D0KIns2iM2wuQSgqU/vweY bWiwqEkodg+X+CuJ/5nch5z6xw+6d2MNC/mkYik5pFL4Na4O7eNjoIclVlq7bgcu hAJMx1B4flAoGqcRjUCRq39/fzKDEp1cJ1G4FjM8wWdPagumKQcgH51GkVn+6+HU a36xOHjdT3P3j+wcu512 =cUp7 -END PGP SIGNATURE-
Re: Exception when accessing partition, offset and timestamp in processor class
Sorry for the email again I was expecting it to work always when accessed from process() method as this corresponds to each kafka message/record processing. I understand illegalstate by the time punctuate() is called as its already batched by time interval Regards Sai On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra wrote: > Hi > > his is with my streaming app kafka 10.1.0. > > My flow looks something like below > > source topic stream -> filter for null value ->map to make it keyed by id > ->custom processor to mystore -> to another topic -> ktable > > I am hitting the below type of exception in a custom processor class if I > try to access offset() or partition() or timestamp() from the > ProcessorContext in the process() method. I was hoping it would return the > partition and offset for the enclosing topic(in this case source topic) > where its consuming from or -1 based on the api docs. > > Looks like only in certain cases it is accessible. is it getting lost in > transformation phases. > > Same issue happens on if i try to access them in punctuate() method but > some where I saw that it might not work in punctuate(). Any reason for this > or any link describing this will be helpful > > > > > java.lang.IllegalStateException: This should not happen as offset() > should only be called while a record is processed > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] > at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] > at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.kstream.internals.KStreamMap$ > KStreamMapProcessor.process(KStreamMap.java:43) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.kstream.internals.KStreamFilter$ > KStreamFilterProcessor.process(KStreamFilter.java:44) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?] > = > > > Regards > Sai >
Exception when accessing partition, offset and timestamp in processor class
Hi his is with my streaming app kafka 10.1.0. My flow looks something like below source topic stream -> filter for null value ->map to make it keyed by id ->custom processor to mystore -> to another topic -> ktable I am hitting the below type of exception in a custom processor class if I try to access offset() or partition() or timestamp() from the ProcessorContext in the process() method. I was hoping it would return the partition and offset for the enclosing topic(in this case source topic) where its consuming from or -1 based on the api docs. Looks like only in certain cases it is accessible. is it getting lost in transformation phases. Same issue happens on if i try to access them in punctuate() method but some where I saw that it might not work in punctuate(). Any reason for this or any link describing this will be helpful java.lang.IllegalStateException: This should not happen as offset() should only be called while a record is processed at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181) ~[kafka-streams-0.10.1.0.jar!/:?] at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?] = Regards Sai