Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Thx! :)

On 10/25/16 2:03 PM, saiprasad mishra wrote:
> Just created the JIRA
> 
> https://issues.apache.org/jira/browse/KAFKA-4344
> 
> Regards Sai
> 
> On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra < 
> saiprasadmis...@gmail.com> wrote:
> 
>> My JIRA id is saimishra
>> 
>> Regards Sai
>> 
>> On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax
>>  wrote:
>> 
> What is your JIRA ID? We can add you to the contributor list to
> give you permission.
> 
> -Matthias
> 
> 
> On 10/25/16 10:48 AM, saiprasad mishra wrote:
> Hi Matthias Thanks for the reply. I think I don't have
> permission for this. If you can grant me permission I can
> create one (my handle is saimishra). Or you can go ahead
> and create one
> 
> I may need permission to create JIRA as I might report more
> issues after discussing with you over here.
> 
> Regards Sai
> 
> On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax 
>  wrote:
> 
> Hi,
> 
> sorry for late reply. Seems like a bug to me; within 
> Processor#process() accessing the context should work. Can
> you open a JIRA for it?
> 
> -Matthias
> 
> On 10/23/16 10:28 PM, saiprasad mishra wrote:
 Sorry for the email again
 
 I was expecting it to work always when accessed from 
 process() method as this corresponds to each kafka 
 message/record processing. I understand illegalstate
 by the time punctuate() is called as its already
 batched by time interval
 
 Regards Sai
 
 On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
  wrote:
 
> Hi
> 
> his is with my streaming app kafka 10.1.0.
> 
> My flow looks something like below
> 
> source topic stream -> filter for null value ->map
> to make it keyed by id ->custom processor to
> mystore -> to another topic -> ktable
> 
> I am hitting the below type of exception in a
> custom processor class if I try to access offset()
> or partition() or timestamp() from the
> ProcessorContext in the process() method. I was
> hoping it would return the partition and offset for
> the enclosing topic(in this case source topic) 
> where its consuming from or -1 based on the api
> docs.
> 
> Looks like only in certain cases it is accessible.
> is it getting lost in transformation phases.
> 
> Same issue happens on if i try to access them in 
> punctuate() method but some where I saw that it
> might not work in punctuate(). Any reason for this
> or any link describing this will be helpful
> 
> 
> ==

>
> 
==
> 
> 
> 
> 
> 
> 
> java.lang.IllegalStateException: This should not happen as
> offset()
> should only be called while a record is processed
> at org.apache.kafka.streams.processor.internals. 
> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>
> 
~[kafka-streams-0.10.1.0.jar!/:?] at
> com.sai.repo.MyStore.process(MyStore.java:72) 
> ~[classes!/:?] at 
> com.sai.repo.MyStore.process(MyStore.java:39) 
> ~[classes!/:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
> 
(Pr
> 
> 
> ocessorNode.java:82)
> 
> 
> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals. 
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>
>
>
> 
~[kafka-streams-0.10.1.0.jar!/:?] at
> org.apache.kafka.streams.kstream.internals.KStreamMap$
>
> 
KStreamMapProcessor.process(KStreamMap.java:43)
> ~[kafka-streams-0.10.1.0.jar!/:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
> 
(Pr
> 
> 
> ocessorNode.java:82)
> 
> 
> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals. 
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>
>
>
> 
~[kafka-streams-0.10.1.0.jar!/:?] at
> org.apache.kafka.streams.kstream.internals.KStreamFilter$
>
> 
KStreamFilterProcessor.process(KStreamFilter.java:44)
> ~[kafka-streams-0.10.1.0.jar!/:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.pro
cess
>
> 
(Pr
> 
> 
> ocessorNode.java:82)
> 
> 
> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals. 
> ProcessorContextImpl.forw

Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread saiprasad mishra
Just created the JIRA

https://issues.apache.org/jira/browse/KAFKA-4344

Regards
Sai

On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra <
saiprasadmis...@gmail.com> wrote:

> My JIRA id is saimishra
>
> Regards
> Sai
>
> On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax 
> wrote:
>
>> -BEGIN PGP SIGNED MESSAGE-
>> Hash: SHA512
>>
>> What is your JIRA ID? We can add you to the contributor list to give
>> you permission.
>>
>> - -Matthias
>>
>>
>> On 10/25/16 10:48 AM, saiprasad mishra wrote:
>> > Hi Matthias Thanks for the reply. I think I don't have permission
>> > for this. If you can grant me permission I can create one (my
>> > handle is saimishra). Or you can go ahead and create one
>> >
>> > I may need permission to create JIRA as I might report more issues
>> > after discussing with you over here.
>> >
>> > Regards Sai
>> >
>> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
>> >  wrote:
>> >
>> > Hi,
>> >
>> > sorry for late reply. Seems like a bug to me; within
>> > Processor#process() accessing the context should work. Can you open
>> > a JIRA for it?
>> >
>> > -Matthias
>> >
>> > On 10/23/16 10:28 PM, saiprasad mishra wrote:
>>  Sorry for the email again
>> 
>>  I was expecting it to work always when accessed from
>>  process() method as this corresponds to each kafka
>>  message/record processing. I understand illegalstate by the
>>  time punctuate() is called as its already batched by time
>>  interval
>> 
>>  Regards Sai
>> 
>>  On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
>>  > > wrote:
>> 
>> > Hi
>> >
>> > his is with my streaming app kafka 10.1.0.
>> >
>> > My flow looks something like below
>> >
>> > source topic stream -> filter for null value ->map to make
>> > it keyed by id ->custom processor to mystore -> to another
>> > topic -> ktable
>> >
>> > I am hitting the below type of exception in a custom
>> > processor class if I try to access offset() or partition()
>> > or timestamp() from the ProcessorContext in the process()
>> > method. I was hoping it would return the partition and
>> > offset for the enclosing topic(in this case source topic)
>> > where its consuming from or -1 based on the api docs.
>> >
>> > Looks like only in certain cases it is accessible. is it
>> > getting lost in transformation phases.
>> >
>> > Same issue happens on if i try to access them in
>> > punctuate() method but some where I saw that it might not
>> > work in punctuate(). Any reason for this or any link
>> > describing this will be helpful
>> >
>> >
>> > ==
>> ==
>> >
>> >
>> >
>> >
>> >
>> >
>> java.lang.IllegalStateException: This should not happen as offset()
>> > should only be called while a record is processed at
>> > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>> > ~[kafka-streams-0.10.1.0.jar!/:?] at
>> > com.sai.repo.MyStore.process(MyStore.java:72)
>> > ~[classes!/:?] at
>> > com.sai.repo.MyStore.process(MyStore.java:39)
>> > ~[classes!/:?] at
>> > org.apache.kafka.streams.processor.internals.ProcessorNode.process
>> (Pr
>> >
>> >
>> ocessorNode.java:82)
>> >
>> >
>> >
>> > ~[kafka-streams-0.10.1.0.jar!/:?]
>> > at org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> >
>> >
>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> > org.apache.kafka.streams.kstream.internals.KStreamMap$
>> > KStreamMapProcessor.process(KStreamMap.java:43)
>> > ~[kafka-streams-0.10.1.0.jar!/:?] at
>> > org.apache.kafka.streams.processor.internals.ProcessorNode.process
>> (Pr
>> >
>> >
>> ocessorNode.java:82)
>> >
>> >
>> >
>> > ~[kafka-streams-0.10.1.0.jar!/:?]
>> > at org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> >
>> >
>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> > org.apache.kafka.streams.kstream.internals.KStreamFilter$
>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
>> > ~[kafka-streams-0.10.1.0.jar!/:?] at
>> > org.apache.kafka.streams.processor.internals.ProcessorNode.process
>> (Pr
>> >
>> >
>> ocessorNode.java:82)
>> >
>> >
>> >
>> > ~[kafka-streams-0.10.1.0.jar!/:?]
>> > at org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> >
>> >
>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> > org.apache.kafka.streams.processor.internals.
>> > SourceNode.process(SourceNode.java:66)
>> > ~[kafka-streams-0.10.1.0.jar!/:?] at
>> > org.apache.kafka.streams.processor.internals.
>> > StreamTask.process(StreamTask.java:181)
>> > ~[k

Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread saiprasad mishra
My JIRA id is saimishra

Regards
Sai

On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> What is your JIRA ID? We can add you to the contributor list to give
> you permission.
>
> - -Matthias
>
>
> On 10/25/16 10:48 AM, saiprasad mishra wrote:
> > Hi Matthias Thanks for the reply. I think I don't have permission
> > for this. If you can grant me permission I can create one (my
> > handle is saimishra). Or you can go ahead and create one
> >
> > I may need permission to create JIRA as I might report more issues
> > after discussing with you over here.
> >
> > Regards Sai
> >
> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
> >  wrote:
> >
> > Hi,
> >
> > sorry for late reply. Seems like a bug to me; within
> > Processor#process() accessing the context should work. Can you open
> > a JIRA for it?
> >
> > -Matthias
> >
> > On 10/23/16 10:28 PM, saiprasad mishra wrote:
>  Sorry for the email again
> 
>  I was expecting it to work always when accessed from
>  process() method as this corresponds to each kafka
>  message/record processing. I understand illegalstate by the
>  time punctuate() is called as its already batched by time
>  interval
> 
>  Regards Sai
> 
>  On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
>   > wrote:
> 
> > Hi
> >
> > his is with my streaming app kafka 10.1.0.
> >
> > My flow looks something like below
> >
> > source topic stream -> filter for null value ->map to make
> > it keyed by id ->custom processor to mystore -> to another
> > topic -> ktable
> >
> > I am hitting the below type of exception in a custom
> > processor class if I try to access offset() or partition()
> > or timestamp() from the ProcessorContext in the process()
> > method. I was hoping it would return the partition and
> > offset for the enclosing topic(in this case source topic)
> > where its consuming from or -1 based on the api docs.
> >
> > Looks like only in certain cases it is accessible. is it
> > getting lost in transformation phases.
> >
> > Same issue happens on if i try to access them in
> > punctuate() method but some where I saw that it might not
> > work in punctuate(). Any reason for this or any link
> > describing this will be helpful
> >
> >
> > ==
> ==
> >
> >
> >
> >
> >
> >
> java.lang.IllegalStateException: This should not happen as offset()
> > should only be called while a record is processed at
> > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> > ~[kafka-streams-0.10.1.0.jar!/:?] at
> > com.sai.repo.MyStore.process(MyStore.java:72)
> > ~[classes!/:?] at
> > com.sai.repo.MyStore.process(MyStore.java:39)
> > ~[classes!/:?] at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (Pr
> >
> >
> ocessorNode.java:82)
> >
> >
> >
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >
> >
> ~[kafka-streams-0.10.1.0.jar!/:?] at
> > org.apache.kafka.streams.kstream.internals.KStreamMap$
> > KStreamMapProcessor.process(KStreamMap.java:43)
> > ~[kafka-streams-0.10.1.0.jar!/:?] at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (Pr
> >
> >
> ocessorNode.java:82)
> >
> >
> >
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >
> >
> ~[kafka-streams-0.10.1.0.jar!/:?] at
> > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> > ~[kafka-streams-0.10.1.0.jar!/:?] at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (Pr
> >
> >
> ocessorNode.java:82)
> >
> >
> >
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >
> >
> ~[kafka-streams-0.10.1.0.jar!/:?] at
> > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:66)
> > ~[kafka-streams-0.10.1.0.jar!/:?] at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:181)
> > ~[kafka-streams-0.10.1.0.jar!/:?] at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> Str
> >
> >
> eamThread.java:436)
> >
> >
> >
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> > [kafk

Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

What is your JIRA ID? We can add you to the contributor list to give
you permission.

- -Matthias


On 10/25/16 10:48 AM, saiprasad mishra wrote:
> Hi Matthias Thanks for the reply. I think I don't have permission
> for this. If you can grant me permission I can create one (my
> handle is saimishra). Or you can go ahead and create one
> 
> I may need permission to create JIRA as I might report more issues
> after discussing with you over here.
> 
> Regards Sai
> 
> On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
>  wrote:
> 
> Hi,
> 
> sorry for late reply. Seems like a bug to me; within 
> Processor#process() accessing the context should work. Can you open
> a JIRA for it?
> 
> -Matthias
> 
> On 10/23/16 10:28 PM, saiprasad mishra wrote:
 Sorry for the email again
 
 I was expecting it to work always when accessed from
 process() method as this corresponds to each kafka
 message/record processing. I understand illegalstate by the
 time punctuate() is called as its already batched by time
 interval
 
 Regards Sai
 
 On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
  wrote:
 
> Hi
> 
> his is with my streaming app kafka 10.1.0.
> 
> My flow looks something like below
> 
> source topic stream -> filter for null value ->map to make
> it keyed by id ->custom processor to mystore -> to another
> topic -> ktable
> 
> I am hitting the below type of exception in a custom
> processor class if I try to access offset() or partition()
> or timestamp() from the ProcessorContext in the process()
> method. I was hoping it would return the partition and
> offset for the enclosing topic(in this case source topic)
> where its consuming from or -1 based on the api docs.
> 
> Looks like only in certain cases it is accessible. is it
> getting lost in transformation phases.
> 
> Same issue happens on if i try to access them in
> punctuate() method but some where I saw that it might not
> work in punctuate(). Any reason for this or any link
> describing this will be helpful
> 
> 
> ==
==
>
>
>
>
>
> 
java.lang.IllegalStateException: This should not happen as offset()
> should only be called while a record is processed at 
> org.apache.kafka.streams.processor.internals. 
> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) 
> ~[kafka-streams-0.10.1.0.jar!/:?] at 
> com.sai.repo.MyStore.process(MyStore.java:72)
> ~[classes!/:?] at 
> com.sai.repo.MyStore.process(MyStore.java:39)
> ~[classes!/:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
> 
ocessorNode.java:82)
> 
> 
> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals. 
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>
> 
~[kafka-streams-0.10.1.0.jar!/:?] at
> org.apache.kafka.streams.kstream.internals.KStreamMap$ 
> KStreamMapProcessor.process(KStreamMap.java:43) 
> ~[kafka-streams-0.10.1.0.jar!/:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
> 
ocessorNode.java:82)
> 
> 
> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals. 
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>
> 
~[kafka-streams-0.10.1.0.jar!/:?] at
> org.apache.kafka.streams.kstream.internals.KStreamFilter$ 
> KStreamFilterProcessor.process(KStreamFilter.java:44) 
> ~[kafka-streams-0.10.1.0.jar!/:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(Pr
>
> 
ocessorNode.java:82)
> 
> 
> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals. 
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>
> 
~[kafka-streams-0.10.1.0.jar!/:?] at
> org.apache.kafka.streams.processor.internals. 
> SourceNode.process(SourceNode.java:66) 
> ~[kafka-streams-0.10.1.0.jar!/:?] at 
> org.apache.kafka.streams.processor.internals. 
> StreamTask.process(StreamTask.java:181) 
> ~[kafka-streams-0.10.1.0.jar!/:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
Str
>
> 
eamThread.java:436)
> 
> 
> 
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals. 
> StreamThread.run(StreamThread.java:242) 
> [kafka-streams-0.10.1.0.jar!/:?] 
> ==
===
>
>
>
>
>
>
> 
Regards
> Sai
> 
 
>> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYD5yXAAoJECnhiMLycopP+Y4QAK8vkvLauOiJQrb2b56bwZIG
7uSMnCagjGYnpMnpI

Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread saiprasad mishra
Hi Matthias
Thanks for the reply.
I think I don't have permission for this.
If you can grant me permission I can create one (my handle is saimishra).
Or you can go ahead and create one

I may need permission to create JIRA as I might report more issues after
discussing with you over here.

Regards
Sai

On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Hi,
>
> sorry for late reply. Seems like a bug to me; within
> Processor#process() accessing the context should work. Can you open a
> JIRA for it?
>
> - -Matthias
>
> On 10/23/16 10:28 PM, saiprasad mishra wrote:
> > Sorry for the email again
> >
> > I was expecting it to work always when accessed from process()
> > method as this corresponds to each kafka message/record processing.
> > I understand illegalstate by the time punctuate() is called as its
> > already batched by time interval
> >
> > Regards Sai
> >
> > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
> >  >> wrote:
> >
> >> Hi
> >>
> >> his is with my streaming app kafka 10.1.0.
> >>
> >> My flow looks something like below
> >>
> >> source topic stream -> filter for null value ->map to make it
> >> keyed by id ->custom processor to mystore -> to another topic ->
> >> ktable
> >>
> >> I am hitting the below type of exception in a custom processor
> >> class if I try to access offset() or partition() or timestamp()
> >> from the ProcessorContext in the process() method. I was hoping
> >> it would return the partition and offset for the enclosing
> >> topic(in this case source topic) where its consuming from or -1
> >> based on the api docs.
> >>
> >> Looks like only in certain cases it is accessible. is it getting
> >> lost in transformation phases.
> >>
> >> Same issue happens on if i try to access them in punctuate()
> >> method but some where I saw that it might not work in
> >> punctuate(). Any reason for this or any link describing this
> >> will be helpful
> >>
> >>
> >> 
> >>
> >>
> >>
> >>
> java.lang.IllegalStateException: This should not happen as offset()
> >> should only be called while a record is processed at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at
> >> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.kstream.internals.KStreamMap$
> >> KStreamMapProcessor.process(KStreamMap.java:43)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.kstream.internals.KStreamFilter$
> >> KStreamFilterProcessor.process(KStreamFilter.java:44)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:66)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:181)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str
> eamThread.java:436)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:242)
> >> [kafka-streams-0.10.1.0.jar!/:?]
> >> =
> >>
> >>
> >>
> >>
> >>
> Regards
> >> Sai
> >>
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM
> ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG
> AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8
> DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S
> udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw
> Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp
> 5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp
> cM9w2Bo8Wxo9qPz0

Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Hi,

sorry for late reply. Seems like a bug to me; within
Processor#process() accessing the context should work. Can you open a
JIRA for it?

- -Matthias

On 10/23/16 10:28 PM, saiprasad mishra wrote:
> Sorry for the email again
> 
> I was expecting it to work always when accessed from process() 
> method as this corresponds to each kafka message/record processing.
> I understand illegalstate by the time punctuate() is called as its
> already batched by time interval
> 
> Regards Sai
> 
> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra 
> > wrote:
> 
>> Hi
>> 
>> his is with my streaming app kafka 10.1.0.
>> 
>> My flow looks something like below
>> 
>> source topic stream -> filter for null value ->map to make it 
>> keyed by id ->custom processor to mystore -> to another topic -> 
>> ktable
>> 
>> I am hitting the below type of exception in a custom processor 
>> class if I try to access offset() or partition() or timestamp() 
>> from the ProcessorContext in the process() method. I was hoping 
>> it would return the partition and offset for the enclosing 
>> topic(in this case source topic) where its consuming from or -1 
>> based on the api docs.
>> 
>> Looks like only in certain cases it is accessible. is it getting 
>> lost in transformation phases.
>> 
>> Same issue happens on if i try to access them in punctuate() 
>> method but some where I saw that it might not work in 
>> punctuate(). Any reason for this or any link describing this
>> will be helpful
>> 
>> 
>> 
>>
>>
>>
>> 
java.lang.IllegalStateException: This should not happen as offset()
>> should only be called while a record is processed at 
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at 
>> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.kstream.internals.KStreamMap$ 
>> KStreamMapProcessor.process(KStreamMap.java:43) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.kstream.internals.KStreamFilter$ 
>> KStreamFilterProcessor.process(KStreamFilter.java:44) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
ocessorNode.java:82)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals. 
>> SourceNode.process(SourceNode.java:66) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals. 
>> StreamTask.process(StreamTask.java:181) 
>> ~[kafka-streams-0.10.1.0.jar!/:?] at 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str
eamThread.java:436)
>>
>>
>> 
~[kafka-streams-0.10.1.0.jar!/:?]
>> at org.apache.kafka.streams.processor.internals. 
>> StreamThread.run(StreamThread.java:242) 
>> [kafka-streams-0.10.1.0.jar!/:?] 
>> =
>>
>>
>>
>>
>> 
Regards
>> Sai
>> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYD5XWAAoJECnhiMLycopPUogQAJ6qawqVVmUORrGugiAC3/YM
ge0bvBSLbwCbys1wkm8vi17iRcMcZgYV1kUbspCBa8Ax7sA7YgmeqEYJpuCt6rRG
AXOepZ7WCF+q9NK8aLGTr94ymKMT4t5KlaBMmR9AMR0jAK8iGZJIYcwWHdzYQZz8
DjY2lYmkkzAQSorx2s9v4AEU2LiCsug3jJY/3/uQYAQnEPmHG5IoOmHnQoWQqT8S
udLAtbzCRTcA3Fua5UE1P8KCQG2Pjw8DuDE5qxi0DWVmiuB+ASzp2V7+yVxLVotw
Okg2q1V0T9L0QorbwZ1nG6fys+OeOSIX3vg1KM8nUOCC2YbeGtueYqRte5ThE/Xp
5rVXNIHXGzpcO1BeZT8BdDHcFc/4AR6fHZy0XFv6gHDRn4nsemwGOiNRADjhNaNp
cM9w2Bo8Wxo9qPz0fAnaYTTNt/J4h2RkycIcFTY2xvBVfmjJZwq9XVVwIXkIDnxN
sTxM6Czy4L7bcP+y6B/tqOG96cIJ5czKZwD7qwEOM9D0KIns2iM2wuQSgqU/vweY
bWiwqEkodg+X+CuJ/5nch5z6xw+6d2MNC/mkYik5pFL4Na4O7eNjoIclVlq7bgcu
hAJMx1B4flAoGqcRjUCRq39/fzKDEp1cJ1G4FjM8wWdPagumKQcgH51GkVn+6+HU
a36xOHjdT3P3j+wcu512
=cUp7
-END PGP SIGNATURE-


Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-23 Thread saiprasad mishra
Sorry for the email again

I was expecting it to work always when accessed from process() method as
this corresponds to each kafka message/record processing.
I understand illegalstate by the time punctuate() is called as its already
batched by time interval

Regards
Sai

On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra  wrote:

> Hi
>
> his is with my streaming app kafka 10.1.0.
>
> My flow looks something like below
>
> source topic stream -> filter for null value ->map to make it keyed by id
> ->custom processor to mystore -> to another topic -> ktable
>
> I am hitting the below type of exception in a custom processor class if I
> try to access offset() or partition() or timestamp() from the
> ProcessorContext in the process() method. I was hoping it would return the
> partition and offset for the enclosing topic(in this case source topic)
> where its consuming from or -1 based on the api docs.
>
> Looks like only in certain cases it is accessible. is it getting lost in
> transformation phases.
>
> Same issue happens on if i try to access them in punctuate() method but
> some where I saw that it might not work in punctuate(). Any reason for this
> or any link describing this will be helpful
>
>
> 
>
> java.lang.IllegalStateException: This should not happen as offset()
> should only be called while a record is processed
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?]
> =
>
>
> Regards
> Sai
>


Exception when accessing partition, offset and timestamp in processor class

2016-10-23 Thread saiprasad mishra
Hi

his is with my streaming app kafka 10.1.0.

My flow looks something like below

source topic stream -> filter for null value ->map to make it keyed by id
->custom processor to mystore -> to another topic -> ktable

I am hitting the below type of exception in a custom processor class if I
try to access offset() or partition() or timestamp() from the
ProcessorContext in the process() method. I was hoping it would return the
partition and offset for the enclosing topic(in this case source topic)
where its consuming from or -1 based on the api docs.

Looks like only in certain cases it is accessible. is it getting lost in
transformation phases.

Same issue happens on if i try to access them in punctuate() method but
some where I saw that it might not work in punctuate(). Any reason for this
or any link describing this will be helpful




java.lang.IllegalStateException: This should not happen as offset() should
only be called while a record is processed
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]
=


Regards
Sai