Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-05-17 Thread Guozhang Wang
Sachin,

We are discussing how to work around KAFKA-4740 for poison pill records:

https://issues.apache.org/jira/browse/KAFKA-5157

And Please share your scenario and your opinions on the solution there.


Guozhang

On Tue, May 16, 2017 at 9:50 PM, Sachin Mittal  wrote:

> Folks is there any updated on
> https://issues.apache.org/jira/browse/KAFKA-4740.
> This is now so far only pressing issue for our streams application.
>
> It happens from time to time and so far our only solution is to increase
> the offset of the group for that partition beyond the offset that caused
> this issue.
> Other thing I have noticed is that such (poison pill) messages then to
> bunch together. So usually we need to increase the offset by say 1 to
> get past the issue.
> So basically we loose processing of these many messages. If we increase the
> offset by just one then we again see another such offset few offsets higher
> and so on.
>
> So in order to prevent multiple manual restart of the streams application
> we simply increase the offset by 1. Then streams application works
> uninterrupted for few months and one fine day we again see such messages.
>
> As the bug is critical shouldn't we get some fix or workaround in next
> release.
>
> Thanks
> Sachin
>
>
>
>
>
> On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll 
> wrote:
>
> > Sachin,
> >
> > there's a JIRA that seems related to what you're seeing:
> > https://issues.apache.org/jira/browse/KAFKA-4740
> >
> > Perhaps you could check the above and report back?
> >
> > -Michael
> >
> >
> >
> >
> > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll 
> > wrote:
> >
> > > Hmm, I re-read the stacktrace again. It does look like the value-side
> > > being the culprit (as Sachin suggested earlier).
> > >
> > > -Michael
> > >
> > >
> > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll 
> > > wrote:
> > >
> > >> Sachin,
> > >>
> > >> you have this line:
> > >>
> > >> > builder.stream(Serdes.String(), serde, "advice-stream")
> > >>
> > >> Could the problem be that not the record values are causing the
> problem
> > >> -- because your value deserializer does try-catch any such errors --
> but
> > >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> > does
> > >> not try-catch deserialization errors, and from a quick look at the
> > source
> > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> > >> error above ("Error deserializing key/value for partition..."), and
> the
> > >> Fetcher is swallowing the more specific SerializationException of
> > >> `String.Serdes()` (but it will include the original
> exception/Throwable
> > in
> > >> its own SerializationException).
> > >>
> > >> -Michael
> > >>
> > >>
> > >>
> > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal 
> > >> wrote:
> > >>
> > >>> My streams application does run in debug mode only.
> > >>> Also I have checked the code around these lines
> > >>>
> > >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> java:791)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> > >>> ord(Fetcher.java:867)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>
> > >>> I don't see any log statement which will give me more information.
> > >>>
> > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> > >>>
> > >>> The issue is happening at this line and perhaps handling the
> exception
> > >>> and
> > >>> setting the value to be null may be better options.
> > >>> Yes at client side nothing can be done because exception is happening
> > >>> before this.valueDeserializer.deserialize can be called.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy 
> > >>> wrote:
> > >>>
> > >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> > >>> message
> > >>> > has already been received into Streams.
> > >>> > You could create a simple app that uses the Consumer, seeks to the
> > >>> offset,
> > >>> > and tries to read the message. If you did this in debug mode you
> > might
> > >>> find
> > >>> > out some more information.
> > >>> >
> > >>> >
> > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal 
> > wrote:
> > >>> >
> > >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> > and
> > >>> it
> > >>> > > fails with same error.
> > >>> > >
> > >>> > > So was wondering if I can apply any of the suggestion as per
> > >>> > >
> > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > >>> > >
> > >>> > > If there is any other was just to get the contents of that
> message
> > it
> > >>> > 

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-05-16 Thread Sachin Mittal
Folks is there any updated on
https://issues.apache.org/jira/browse/KAFKA-4740.
This is now so far only pressing issue for our streams application.

It happens from time to time and so far our only solution is to increase
the offset of the group for that partition beyond the offset that caused
this issue.
Other thing I have noticed is that such (poison pill) messages then to
bunch together. So usually we need to increase the offset by say 1 to
get past the issue.
So basically we loose processing of these many messages. If we increase the
offset by just one then we again see another such offset few offsets higher
and so on.

So in order to prevent multiple manual restart of the streams application
we simply increase the offset by 1. Then streams application works
uninterrupted for few months and one fine day we again see such messages.

As the bug is critical shouldn't we get some fix or workaround in next
release.

Thanks
Sachin





On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll  wrote:

> Sachin,
>
> there's a JIRA that seems related to what you're seeing:
> https://issues.apache.org/jira/browse/KAFKA-4740
>
> Perhaps you could check the above and report back?
>
> -Michael
>
>
>
>
> On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll 
> wrote:
>
> > Hmm, I re-read the stacktrace again. It does look like the value-side
> > being the culprit (as Sachin suggested earlier).
> >
> > -Michael
> >
> >
> > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll 
> > wrote:
> >
> >> Sachin,
> >>
> >> you have this line:
> >>
> >> > builder.stream(Serdes.String(), serde, "advice-stream")
> >>
> >> Could the problem be that not the record values are causing the problem
> >> -- because your value deserializer does try-catch any such errors -- but
> >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> does
> >> not try-catch deserialization errors, and from a quick look at the
> source
> >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> >> error above ("Error deserializing key/value for partition..."), and the
> >> Fetcher is swallowing the more specific SerializationException of
> >> `String.Serdes()` (but it will include the original exception/Throwable
> in
> >> its own SerializationException).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal 
> >> wrote:
> >>
> >>> My streams application does run in debug mode only.
> >>> Also I have checked the code around these lines
> >>>
> >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> >>> ord(Fetcher.java:867)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>
> >>> I don't see any log statement which will give me more information.
> >>>
> >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> >>>
> >>> The issue is happening at this line and perhaps handling the exception
> >>> and
> >>> setting the value to be null may be better options.
> >>> Yes at client side nothing can be done because exception is happening
> >>> before this.valueDeserializer.deserialize can be called.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy 
> >>> wrote:
> >>>
> >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> >>> message
> >>> > has already been received into Streams.
> >>> > You could create a simple app that uses the Consumer, seeks to the
> >>> offset,
> >>> > and tries to read the message. If you did this in debug mode you
> might
> >>> find
> >>> > out some more information.
> >>> >
> >>> >
> >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal 
> wrote:
> >>> >
> >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> and
> >>> it
> >>> > > fails with same error.
> >>> > >
> >>> > > So was wondering if I can apply any of the suggestion as per
> >>> > >
> >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> >>> > >
> >>> > > If there is any other was just to get the contents of that message
> it
> >>> > would
> >>> > > be helpful.
> >>> > >
> >>> > > Thanks
> >>> > > Sachin
> >>> > >
> >>> > >
> >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
> >>> > wrote:
> >>> > >
> >>> > > > Hi Sachin,
> >>> > > >
> >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> that
> >>> > offset
> >>> > > > on the topic and seeing what the message is? Might be easier to
> >>> debug?
> >>> > > Like
> >>> > > > you say, it is failing in the consumer.
> >>> > > > Thanks,
> >>> > > > Damian
> >>> > > >
> >>> > > > On Thu, 30 Mar 2017 at 10:35 Sac

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Damian Guy
Not that i'm aware of

On Thu, 30 Mar 2017 at 16:00 Sachin Mittal  wrote:

> Damian,
> Is there any way where I can just dump out the contents at a given offset
> from a given log segment file.
>
> I am not sure how DumpLogSegment
> <
> https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment
> >
> helps. I already know the log segment file where that message is. Thing is
> size is 1 GB and there is no easy way to inspect that file and actually see
> what the payload is.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 7:18 PM, Damian Guy  wrote:
>
> > Sachin,
> >
> > Not sure if this will help, but you might want to try running
> > https://cwiki.apache.org/confluence/display/KAFKA/
> > System+Tools#SystemTools-DumpLogSegment
> > on the partition that is causing you problems.
> >
> > Thanks
> > Damian
> >
> > On Thu, 30 Mar 2017 at 14:29 Michael Noll  wrote:
> >
> > > Sachin,
> > >
> > > there's a JIRA that seems related to what you're seeing:
> > > https://issues.apache.org/jira/browse/KAFKA-4740
> > >
> > > Perhaps you could check the above and report back?
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll 
> > > wrote:
> > >
> > > > Hmm, I re-read the stacktrace again. It does look like the value-side
> > > > being the culprit (as Sachin suggested earlier).
> > > >
> > > > -Michael
> > > >
> > > >
> > > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll 
> > > > wrote:
> > > >
> > > >> Sachin,
> > > >>
> > > >> you have this line:
> > > >>
> > > >> > builder.stream(Serdes.String(), serde, "advice-stream")
> > > >>
> > > >> Could the problem be that not the record values are causing the
> > problem
> > > >> -- because your value deserializer does try-catch any such errors --
> > but
> > > >> that the record *keys* are malformed?  The built-in
> `Serdes.String()`
> > > does
> > > >> not try-catch deserialization errors, and from a quick look at the
> > > source
> > > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> > > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> > > >> error above ("Error deserializing key/value for partition..."), and
> > the
> > > >> Fetcher is swallowing the more specific SerializationException of
> > > >> `String.Serdes()` (but it will include the original
> > exception/Throwable
> > > in
> > > >> its own SerializationException).
> > > >>
> > > >> -Michael
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal 
> > > >> wrote:
> > > >>
> > > >>> My streams application does run in debug mode only.
> > > >>> Also I have checked the code around these lines
> > > >>>
> > > >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> > java:791)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> > > >>> ord(Fetcher.java:867)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>
> > > >>> I don't see any log statement which will give me more information.
> > > >>>
> > > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> > > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> > > >>>
> > > >>> The issue is happening at this line and perhaps handling the
> > exception
> > > >>> and
> > > >>> setting the value to be null may be better options.
> > > >>> Yes at client side nothing can be done because exception is
> happening
> > > >>> before this.valueDeserializer.deserialize can be called.
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>>
> > > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy 
> > > >>> wrote:
> > > >>>
> > > >>> > The suggestions in that FAQ won't help as it is too late, i.e.,
> the
> > > >>> message
> > > >>> > has already been received into Streams.
> > > >>> > You could create a simple app that uses the Consumer, seeks to
> the
> > > >>> offset,
> > > >>> > and tries to read the message. If you did this in debug mode you
> > > might
> > > >>> find
> > > >>> > out some more information.
> > > >>> >
> > > >>> >
> > > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal 
> > > wrote:
> > > >>> >
> > > >>> > > Well I try to read that offset via kafka-console-consumer.sh
> too
> > > and
> > > >>> it
> > > >>> > > fails with same error.
> > > >>> > >
> > > >>> > > So was wondering if I can apply any of the suggestion as per
> > > >>> > >
> > > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > > >>> > >
> > > >>> > > If there is any other was just to get the contents of that
> > message
> > > it
> > > >>> > would
> > > >>> > > be helpful.
> > > >>> > >
> > > >>> > > Thanks
> > > >>> > > Sachin
> > > >>> > >
> > > >>> > >
> > > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <
> > damian...

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Sachin Mittal
Damian,
Is there any way where I can just dump out the contents at a given offset
from a given log segment file.

I am not sure how DumpLogSegment

helps. I already know the log segment file where that message is. Thing is
size is 1 GB and there is no easy way to inspect that file and actually see
what the payload is.

Thanks
Sachin


On Thu, Mar 30, 2017 at 7:18 PM, Damian Guy  wrote:

> Sachin,
>
> Not sure if this will help, but you might want to try running
> https://cwiki.apache.org/confluence/display/KAFKA/
> System+Tools#SystemTools-DumpLogSegment
> on the partition that is causing you problems.
>
> Thanks
> Damian
>
> On Thu, 30 Mar 2017 at 14:29 Michael Noll  wrote:
>
> > Sachin,
> >
> > there's a JIRA that seems related to what you're seeing:
> > https://issues.apache.org/jira/browse/KAFKA-4740
> >
> > Perhaps you could check the above and report back?
> >
> > -Michael
> >
> >
> >
> >
> > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll 
> > wrote:
> >
> > > Hmm, I re-read the stacktrace again. It does look like the value-side
> > > being the culprit (as Sachin suggested earlier).
> > >
> > > -Michael
> > >
> > >
> > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll 
> > > wrote:
> > >
> > >> Sachin,
> > >>
> > >> you have this line:
> > >>
> > >> > builder.stream(Serdes.String(), serde, "advice-stream")
> > >>
> > >> Could the problem be that not the record values are causing the
> problem
> > >> -- because your value deserializer does try-catch any such errors --
> but
> > >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> > does
> > >> not try-catch deserialization errors, and from a quick look at the
> > source
> > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> > >> error above ("Error deserializing key/value for partition..."), and
> the
> > >> Fetcher is swallowing the more specific SerializationException of
> > >> `String.Serdes()` (but it will include the original
> exception/Throwable
> > in
> > >> its own SerializationException).
> > >>
> > >> -Michael
> > >>
> > >>
> > >>
> > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal 
> > >> wrote:
> > >>
> > >>> My streams application does run in debug mode only.
> > >>> Also I have checked the code around these lines
> > >>>
> > >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> java:791)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> > >>> ord(Fetcher.java:867)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>
> > >>> I don't see any log statement which will give me more information.
> > >>>
> > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> > >>>
> > >>> The issue is happening at this line and perhaps handling the
> exception
> > >>> and
> > >>> setting the value to be null may be better options.
> > >>> Yes at client side nothing can be done because exception is happening
> > >>> before this.valueDeserializer.deserialize can be called.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy 
> > >>> wrote:
> > >>>
> > >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> > >>> message
> > >>> > has already been received into Streams.
> > >>> > You could create a simple app that uses the Consumer, seeks to the
> > >>> offset,
> > >>> > and tries to read the message. If you did this in debug mode you
> > might
> > >>> find
> > >>> > out some more information.
> > >>> >
> > >>> >
> > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal 
> > wrote:
> > >>> >
> > >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> > and
> > >>> it
> > >>> > > fails with same error.
> > >>> > >
> > >>> > > So was wondering if I can apply any of the suggestion as per
> > >>> > >
> > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > >>> > >
> > >>> > > If there is any other was just to get the contents of that
> message
> > it
> > >>> > would
> > >>> > > be helpful.
> > >>> > >
> > >>> > > Thanks
> > >>> > > Sachin
> > >>> > >
> > >>> > >
> > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <
> damian@gmail.com>
> > >>> > wrote:
> > >>> > >
> > >>> > > > Hi Sachin,
> > >>> > > >
> > >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> > that
> > >>> > offset
> > >>> > > > on the topic and seeing what the message is? Might be easier to
> > >>> debug?
> > >>> > > Like
> > >>> > > > you say, it is failing in the consumer.
> > >>> > > > Thanks,
> > >>> > > > Dam

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Sachin Mittal
I am not sure if https://issues.apache.org/jira/browse/KAFKA-4740 is same
issue as mine.
What I suspect may be happening is that:
  at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
  at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.common.record.Record.value(Record.java:268)
~[kafka-clients-0.10.2.0.jar:na

The size of the value does not match the actual buffer size.

Looks like the producer stored the message value size as

4 byte payload length, containing length V
V byte payload

The length does not matches the actual payload size.

Also we saw this issue for the first time, and we are running streams
application for over six months now. It happens for few messages now.

We may need to catch such exception when getting byte buffer for
keys/values and perhaps return null or something and let the high level
client handle such cases.

Thanks
Sachin



On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll  wrote:

> Sachin,
>
> there's a JIRA that seems related to what you're seeing:
> https://issues.apache.org/jira/browse/KAFKA-4740
>
> Perhaps you could check the above and report back?
>
> -Michael
>
>
>
>
> On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll 
> wrote:
>
> > Hmm, I re-read the stacktrace again. It does look like the value-side
> > being the culprit (as Sachin suggested earlier).
> >
> > -Michael
> >
> >
> > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll 
> > wrote:
> >
> >> Sachin,
> >>
> >> you have this line:
> >>
> >> > builder.stream(Serdes.String(), serde, "advice-stream")
> >>
> >> Could the problem be that not the record values are causing the problem
> >> -- because your value deserializer does try-catch any such errors -- but
> >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> does
> >> not try-catch deserialization errors, and from a quick look at the
> source
> >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> >> error above ("Error deserializing key/value for partition..."), and the
> >> Fetcher is swallowing the more specific SerializationException of
> >> `String.Serdes()` (but it will include the original exception/Throwable
> in
> >> its own SerializationException).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal 
> >> wrote:
> >>
> >>> My streams application does run in debug mode only.
> >>> Also I have checked the code around these lines
> >>>
> >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> >>> ord(Fetcher.java:867)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>
> >>> I don't see any log statement which will give me more information.
> >>>
> >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> >>>
> >>> The issue is happening at this line and perhaps handling the exception
> >>> and
> >>> setting the value to be null may be better options.
> >>> Yes at client side nothing can be done because exception is happening
> >>> before this.valueDeserializer.deserialize can be called.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy 
> >>> wrote:
> >>>
> >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> >>> message
> >>> > has already been received into Streams.
> >>> > You could create a simple app that uses the Consumer, seeks to the
> >>> offset,
> >>> > and tries to read the message. If you did this in debug mode you
> might
> >>> find
> >>> > out some more information.
> >>> >
> >>> >
> >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal 
> wrote:
> >>> >
> >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> and
> >>> it
> >>> > > fails with same error.
> >>> > >
> >>> > > So was wondering if I can apply any of the suggestion as per
> >>> > >
> >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> >>> > >
> >>> > > If there is any other was just to get the contents of that message
> it
> >>> > would
> >>> > > be helpful.
> >>> > >
> >>> > > Thanks
> >>> > > Sachin
> >>> > >
> >>> > >
> >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
> >>> > wrote:
> >>> > >
> >>> > > > Hi Sachin,
> >>> > > >
> >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> that
> >>> > offset
> >>> > > > on the topic and seeing what the message is? Might be easier to
> >>> debug?
> >>> > > Like
> >>> > > > you say, it is failing in the consumer.
> >>> > > > Thanks,
> >>> > > > Damian
> >>> > > >
> >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin M

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Damian Guy
Sachin,

Not sure if this will help, but you might want to try running
https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment
on the partition that is causing you problems.

Thanks
Damian

On Thu, 30 Mar 2017 at 14:29 Michael Noll  wrote:

> Sachin,
>
> there's a JIRA that seems related to what you're seeing:
> https://issues.apache.org/jira/browse/KAFKA-4740
>
> Perhaps you could check the above and report back?
>
> -Michael
>
>
>
>
> On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll 
> wrote:
>
> > Hmm, I re-read the stacktrace again. It does look like the value-side
> > being the culprit (as Sachin suggested earlier).
> >
> > -Michael
> >
> >
> > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll 
> > wrote:
> >
> >> Sachin,
> >>
> >> you have this line:
> >>
> >> > builder.stream(Serdes.String(), serde, "advice-stream")
> >>
> >> Could the problem be that not the record values are causing the problem
> >> -- because your value deserializer does try-catch any such errors -- but
> >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> does
> >> not try-catch deserialization errors, and from a quick look at the
> source
> >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> >> error above ("Error deserializing key/value for partition..."), and the
> >> Fetcher is swallowing the more specific SerializationException of
> >> `String.Serdes()` (but it will include the original exception/Throwable
> in
> >> its own SerializationException).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal 
> >> wrote:
> >>
> >>> My streams application does run in debug mode only.
> >>> Also I have checked the code around these lines
> >>>
> >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> >>> ord(Fetcher.java:867)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>
> >>> I don't see any log statement which will give me more information.
> >>>
> >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> >>>
> >>> The issue is happening at this line and perhaps handling the exception
> >>> and
> >>> setting the value to be null may be better options.
> >>> Yes at client side nothing can be done because exception is happening
> >>> before this.valueDeserializer.deserialize can be called.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy 
> >>> wrote:
> >>>
> >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> >>> message
> >>> > has already been received into Streams.
> >>> > You could create a simple app that uses the Consumer, seeks to the
> >>> offset,
> >>> > and tries to read the message. If you did this in debug mode you
> might
> >>> find
> >>> > out some more information.
> >>> >
> >>> >
> >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal 
> wrote:
> >>> >
> >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> and
> >>> it
> >>> > > fails with same error.
> >>> > >
> >>> > > So was wondering if I can apply any of the suggestion as per
> >>> > >
> >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> >>> > >
> >>> > > If there is any other was just to get the contents of that message
> it
> >>> > would
> >>> > > be helpful.
> >>> > >
> >>> > > Thanks
> >>> > > Sachin
> >>> > >
> >>> > >
> >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
> >>> > wrote:
> >>> > >
> >>> > > > Hi Sachin,
> >>> > > >
> >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> that
> >>> > offset
> >>> > > > on the topic and seeing what the message is? Might be easier to
> >>> debug?
> >>> > > Like
> >>> > > > you say, it is failing in the consumer.
> >>> > > > Thanks,
> >>> > > > Damian
> >>> > > >
> >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal 
> >>> wrote:
> >>> > > >
> >>> > > > > I think I am following the third option.
> >>> > > > >
> >>> > > > > My pipeline is:
> >>> > > > >
> >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer
> ());
> >>> > > > >
> >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> >>> > > > >   .filter(new Predicate() { ...})
> >>> > > > >   .groupByKey()
> >>> > > > >   .aggregate(new Initializer() {...}, new
> Aggregator >>> V,
> >>> > > V1>()
> >>> > > > > {...}, windows, supplier)
> >>> > > > >   .mapValues(new ValueMapper() { ... })
> >>> > > > >   .foreach(new ForeachAction, V2>() {... });
> >>> > > > >
> >>> > > > >
> >>> > > > > and In VDeserializer (implements Deserializer) I am doi

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Michael Noll
Sachin,

there's a JIRA that seems related to what you're seeing:
https://issues.apache.org/jira/browse/KAFKA-4740

Perhaps you could check the above and report back?

-Michael




On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll  wrote:

> Hmm, I re-read the stacktrace again. It does look like the value-side
> being the culprit (as Sachin suggested earlier).
>
> -Michael
>
>
> On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll 
> wrote:
>
>> Sachin,
>>
>> you have this line:
>>
>> > builder.stream(Serdes.String(), serde, "advice-stream")
>>
>> Could the problem be that not the record values are causing the problem
>> -- because your value deserializer does try-catch any such errors -- but
>> that the record *keys* are malformed?  The built-in `Serdes.String()` does
>> not try-catch deserialization errors, and from a quick look at the source
>> it seems that the `Fetcher` class (clients/src/main/java/org/apa
>> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
>> error above ("Error deserializing key/value for partition..."), and the
>> Fetcher is swallowing the more specific SerializationException of
>> `String.Serdes()` (but it will include the original exception/Throwable in
>> its own SerializationException).
>>
>> -Michael
>>
>>
>>
>> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal 
>> wrote:
>>
>>> My streams application does run in debug mode only.
>>> Also I have checked the code around these lines
>>>
>>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
>>> ord(Fetcher.java:867)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>
>>> I don't see any log statement which will give me more information.
>>>
>>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
>>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>>>
>>> The issue is happening at this line and perhaps handling the exception
>>> and
>>> setting the value to be null may be better options.
>>> Yes at client side nothing can be done because exception is happening
>>> before this.valueDeserializer.deserialize can be called.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy 
>>> wrote:
>>>
>>> > The suggestions in that FAQ won't help as it is too late, i.e., the
>>> message
>>> > has already been received into Streams.
>>> > You could create a simple app that uses the Consumer, seeks to the
>>> offset,
>>> > and tries to read the message. If you did this in debug mode you might
>>> find
>>> > out some more information.
>>> >
>>> >
>>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal  wrote:
>>> >
>>> > > Well I try to read that offset via kafka-console-consumer.sh too and
>>> it
>>> > > fails with same error.
>>> > >
>>> > > So was wondering if I can apply any of the suggestion as per
>>> > >
>>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
>>> > corrupted-records-and-deserialization-errors-poison-pill-messages
>>> > >
>>> > > If there is any other was just to get the contents of that message it
>>> > would
>>> > > be helpful.
>>> > >
>>> > > Thanks
>>> > > Sachin
>>> > >
>>> > >
>>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
>>> > wrote:
>>> > >
>>> > > > Hi Sachin,
>>> > > >
>>> > > > Have you tried firing up a consumer (non-streams), seeking to that
>>> > offset
>>> > > > on the topic and seeing what the message is? Might be easier to
>>> debug?
>>> > > Like
>>> > > > you say, it is failing in the consumer.
>>> > > > Thanks,
>>> > > > Damian
>>> > > >
>>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal 
>>> wrote:
>>> > > >
>>> > > > > I think I am following the third option.
>>> > > > >
>>> > > > > My pipeline is:
>>> > > > >
>>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
>>> > > > >
>>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
>>> > > > >   .filter(new Predicate() { ...})
>>> > > > >   .groupByKey()
>>> > > > >   .aggregate(new Initializer() {...}, new Aggregator>> V,
>>> > > V1>()
>>> > > > > {...}, windows, supplier)
>>> > > > >   .mapValues(new ValueMapper() { ... })
>>> > > > >   .foreach(new ForeachAction, V2>() {... });
>>> > > > >
>>> > > > >
>>> > > > > and In VDeserializer (implements Deserializer) I am doing
>>> > something
>>> > > > like
>>> > > > > this:
>>> > > > >
>>> > > > > public V deserialize(String paramString, byte[]
>>> > paramArrayOfByte) {
>>> > > > > if (paramArrayOfByte == null) { return null;}
>>> > > > > V data = null;
>>> > > > > try {
>>> > > > > data = objectMapper.readValue(paramArrayOfByte, new
>>> > > > > TypeReference() {});
>>> > > > > } catch (Exception e) {
>>> > > > > e.printStackTrace();
>>> > > > > }
>>> > > > > return data;
>>> > > > > }
>>> > > > >
>>> >

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Michael Noll
Hmm, I re-read the stacktrace again. It does look like the value-side being
the culprit (as Sachin suggested earlier).

-Michael


On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll  wrote:

> Sachin,
>
> you have this line:
>
> > builder.stream(Serdes.String(), serde, "advice-stream")
>
> Could the problem be that not the record values are causing the problem --
> because your value deserializer does try-catch any such errors -- but that
> the record *keys* are malformed?  The built-in `Serdes.String()` does not
> try-catch deserialization errors, and from a quick look at the source it
> seems that the `Fetcher` class (clients/src/main/java/org/
> apache/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> error above ("Error deserializing key/value for partition..."), and the
> Fetcher is swallowing the more specific SerializationException of
> `String.Serdes()` (but it will include the original exception/Throwable in
> its own SerializationException).
>
> -Michael
>
>
>
> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal  wrote:
>
>> My streams application does run in debug mode only.
>> Also I have checked the code around these lines
>>
>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
>> ord(Fetcher.java:867)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>
>> I don't see any log statement which will give me more information.
>>
>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>>
>> The issue is happening at this line and perhaps handling the exception and
>> setting the value to be null may be better options.
>> Yes at client side nothing can be done because exception is happening
>> before this.valueDeserializer.deserialize can be called.
>>
>> Thanks
>> Sachin
>>
>>
>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy  wrote:
>>
>> > The suggestions in that FAQ won't help as it is too late, i.e., the
>> message
>> > has already been received into Streams.
>> > You could create a simple app that uses the Consumer, seeks to the
>> offset,
>> > and tries to read the message. If you did this in debug mode you might
>> find
>> > out some more information.
>> >
>> >
>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal  wrote:
>> >
>> > > Well I try to read that offset via kafka-console-consumer.sh too and
>> it
>> > > fails with same error.
>> > >
>> > > So was wondering if I can apply any of the suggestion as per
>> > >
>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
>> > corrupted-records-and-deserialization-errors-poison-pill-messages
>> > >
>> > > If there is any other was just to get the contents of that message it
>> > would
>> > > be helpful.
>> > >
>> > > Thanks
>> > > Sachin
>> > >
>> > >
>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
>> > wrote:
>> > >
>> > > > Hi Sachin,
>> > > >
>> > > > Have you tried firing up a consumer (non-streams), seeking to that
>> > offset
>> > > > on the topic and seeing what the message is? Might be easier to
>> debug?
>> > > Like
>> > > > you say, it is failing in the consumer.
>> > > > Thanks,
>> > > > Damian
>> > > >
>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal 
>> wrote:
>> > > >
>> > > > > I think I am following the third option.
>> > > > >
>> > > > > My pipeline is:
>> > > > >
>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
>> > > > >
>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
>> > > > >   .filter(new Predicate() { ...})
>> > > > >   .groupByKey()
>> > > > >   .aggregate(new Initializer() {...}, new Aggregator> V,
>> > > V1>()
>> > > > > {...}, windows, supplier)
>> > > > >   .mapValues(new ValueMapper() { ... })
>> > > > >   .foreach(new ForeachAction, V2>() {... });
>> > > > >
>> > > > >
>> > > > > and In VDeserializer (implements Deserializer) I am doing
>> > something
>> > > > like
>> > > > > this:
>> > > > >
>> > > > > public V deserialize(String paramString, byte[]
>> > paramArrayOfByte) {
>> > > > > if (paramArrayOfByte == null) { return null;}
>> > > > > V data = null;
>> > > > > try {
>> > > > > data = objectMapper.readValue(paramArrayOfByte, new
>> > > > > TypeReference() {});
>> > > > > } catch (Exception e) {
>> > > > > e.printStackTrace();
>> > > > > }
>> > > > > return data;
>> > > > > }
>> > > > >
>> > > > > So I am catching any exception that may happen when deserializing
>> the
>> > > > data.
>> > > > >
>> > > > > This is what third option suggest (if I am not mistaken).
>> > > > >
>> > > > > Please let me know given the pipeline we which option would be
>> best
>> > and
>> > > > how
>> > > > > can we incorporate that in our pipeline.
>> > > > >
>> > > > > Also not exception is happeni

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Michael Noll
Sachin,

you have this line:

> builder.stream(Serdes.String(), serde, "advice-stream")

Could the problem be that not the record values are causing the problem --
because your value deserializer does try-catch any such errors -- but that
the record *keys* are malformed?  The built-in `Serdes.String()` does not
try-catch deserialization errors, and from a quick look at the source it
seems that the `Fetcher` class
(clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java)
is throwing your error above ("Error deserializing key/value for
partition..."), and the Fetcher is swallowing the more specific
SerializationException of `String.Serdes()` (but it will include the
original exception/Throwable in its own SerializationException).

-Michael



On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal  wrote:

> My streams application does run in debug mode only.
> Also I have checked the code around these lines
>
>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseRecord(Fetcher.java:867)
> ~[kafka-clients-0.10.2.0.jar:na]
>
> I don't see any log statement which will give me more information.
>
> https://github.com/apache/kafka/blob/0.10.2/clients/src/
> main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>
> The issue is happening at this line and perhaps handling the exception and
> setting the value to be null may be better options.
> Yes at client side nothing can be done because exception is happening
> before this.valueDeserializer.deserialize can be called.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy  wrote:
>
> > The suggestions in that FAQ won't help as it is too late, i.e., the
> message
> > has already been received into Streams.
> > You could create a simple app that uses the Consumer, seeks to the
> offset,
> > and tries to read the message. If you did this in debug mode you might
> find
> > out some more information.
> >
> >
> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal  wrote:
> >
> > > Well I try to read that offset via kafka-console-consumer.sh too and it
> > > fails with same error.
> > >
> > > So was wondering if I can apply any of the suggestion as per
> > >
> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > >
> > > If there is any other was just to get the contents of that message it
> > would
> > > be helpful.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
> > wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > Have you tried firing up a consumer (non-streams), seeking to that
> > offset
> > > > on the topic and seeing what the message is? Might be easier to
> debug?
> > > Like
> > > > you say, it is failing in the consumer.
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal 
> wrote:
> > > >
> > > > > I think I am following the third option.
> > > > >
> > > > > My pipeline is:
> > > > >
> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> > > > >
> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > > > >   .filter(new Predicate() { ...})
> > > > >   .groupByKey()
> > > > >   .aggregate(new Initializer() {...}, new Aggregator > > V1>()
> > > > > {...}, windows, supplier)
> > > > >   .mapValues(new ValueMapper() { ... })
> > > > >   .foreach(new ForeachAction, V2>() {... });
> > > > >
> > > > >
> > > > > and In VDeserializer (implements Deserializer) I am doing
> > something
> > > > like
> > > > > this:
> > > > >
> > > > > public V deserialize(String paramString, byte[]
> > paramArrayOfByte) {
> > > > > if (paramArrayOfByte == null) { return null;}
> > > > > V data = null;
> > > > > try {
> > > > > data = objectMapper.readValue(paramArrayOfByte, new
> > > > > TypeReference() {});
> > > > > } catch (Exception e) {
> > > > > e.printStackTrace();
> > > > > }
> > > > > return data;
> > > > > }
> > > > >
> > > > > So I am catching any exception that may happen when deserializing
> the
> > > > data.
> > > > >
> > > > > This is what third option suggest (if I am not mistaken).
> > > > >
> > > > > Please let me know given the pipeline we which option would be best
> > and
> > > > how
> > > > > can we incorporate that in our pipeline.
> > > > >
> > > > > Also not exception is happening when reading from source topic
> which
> > is
> > > > > "advice-stream", so looks like flow is not going to pipeline at all
> > for
> > > > us
> > > > > to handle. It is terminating right at consumer poll.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> mich...@confluent.io>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Sachin Mittal
My streams application does run in debug mode only.
Also I have checked the code around these lines

  at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.common.record.Record.value(Record.java:268)
~[kafka-clients-0.10.2.0.jar:na]
  at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:867)
~[kafka-clients-0.10.2.0.jar:na]

I don't see any log statement which will give me more information.

https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867

The issue is happening at this line and perhaps handling the exception and
setting the value to be null may be better options.
Yes at client side nothing can be done because exception is happening
before this.valueDeserializer.deserialize can be called.

Thanks
Sachin


On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy  wrote:

> The suggestions in that FAQ won't help as it is too late, i.e., the message
> has already been received into Streams.
> You could create a simple app that uses the Consumer, seeks to the offset,
> and tries to read the message. If you did this in debug mode you might find
> out some more information.
>
>
> On Thu, 30 Mar 2017 at 11:50 Sachin Mittal  wrote:
>
> > Well I try to read that offset via kafka-console-consumer.sh too and it
> > fails with same error.
> >
> > So was wondering if I can apply any of the suggestion as per
> >
> > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> corrupted-records-and-deserialization-errors-poison-pill-messages
> >
> > If there is any other was just to get the contents of that message it
> would
> > be helpful.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy 
> wrote:
> >
> > > Hi Sachin,
> > >
> > > Have you tried firing up a consumer (non-streams), seeking to that
> offset
> > > on the topic and seeing what the message is? Might be easier to debug?
> > Like
> > > you say, it is failing in the consumer.
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal  wrote:
> > >
> > > > I think I am following the third option.
> > > >
> > > > My pipeline is:
> > > >
> > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> > > >
> > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > > >   .filter(new Predicate() { ...})
> > > >   .groupByKey()
> > > >   .aggregate(new Initializer() {...}, new Aggregator > V1>()
> > > > {...}, windows, supplier)
> > > >   .mapValues(new ValueMapper() { ... })
> > > >   .foreach(new ForeachAction, V2>() {... });
> > > >
> > > >
> > > > and In VDeserializer (implements Deserializer) I am doing
> something
> > > like
> > > > this:
> > > >
> > > > public V deserialize(String paramString, byte[]
> paramArrayOfByte) {
> > > > if (paramArrayOfByte == null) { return null;}
> > > > V data = null;
> > > > try {
> > > > data = objectMapper.readValue(paramArrayOfByte, new
> > > > TypeReference() {});
> > > > } catch (Exception e) {
> > > > e.printStackTrace();
> > > > }
> > > > return data;
> > > > }
> > > >
> > > > So I am catching any exception that may happen when deserializing the
> > > data.
> > > >
> > > > This is what third option suggest (if I am not mistaken).
> > > >
> > > > Please let me know given the pipeline we which option would be best
> and
> > > how
> > > > can we incorporate that in our pipeline.
> > > >
> > > > Also not exception is happening when reading from source topic which
> is
> > > > "advice-stream", so looks like flow is not going to pipeline at all
> for
> > > us
> > > > to handle. It is terminating right at consumer poll.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll 
> > > > wrote:
> > > >
> > > > > Could this be a corrupted message ("poison pill") in your topic?
> > > > >
> > > > > If so, take a look at
> > > > > http://docs.confluent.io/current/streams/faq.html#
> > > > >
> > > > handling-corrupted-records-and-deserialization-errors-
> > > poison-pill-messages
> > > > >
> > > > > FYI: We're currently investigating a more elegant way to address
> such
> > > > > poison pill problems.  If you have feedback on that front, feel
> free
> > to
> > > > > share it with us. :-)
> > > > >
> > > > > -Michael
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> sjmit...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > This is for first time we are getting a weird exception.
> > > > > > After this the streams caches.
> > > > > >
> > > > > > Only work around is to manually seek and commit offset to a
> greater
> > > > > number
> > > > > > and we are needing this manual intervention again and again.
> > > > > >
> > > > > > Any idea what is causing it and how can we circumvent this.
> > > > > >
> > > > >

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Damian Guy
The suggestions in that FAQ won't help as it is too late, i.e., the message
has already been received into Streams.
You could create a simple app that uses the Consumer, seeks to the offset,
and tries to read the message. If you did this in debug mode you might find
out some more information.


On Thu, 30 Mar 2017 at 11:50 Sachin Mittal  wrote:

> Well I try to read that offset via kafka-console-consumer.sh too and it
> fails with same error.
>
> So was wondering if I can apply any of the suggestion as per
>
> http://docs.confluent.io/3.2.0/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
>
> If there is any other was just to get the contents of that message it would
> be helpful.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy  wrote:
>
> > Hi Sachin,
> >
> > Have you tried firing up a consumer (non-streams), seeking to that offset
> > on the topic and seeing what the message is? Might be easier to debug?
> Like
> > you say, it is failing in the consumer.
> > Thanks,
> > Damian
> >
> > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal  wrote:
> >
> > > I think I am following the third option.
> > >
> > > My pipeline is:
> > >
> > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> > >
> > > builder.stream(Serdes.String(), serde, "advice-stream")
> > >   .filter(new Predicate() { ...})
> > >   .groupByKey()
> > >   .aggregate(new Initializer() {...}, new Aggregator V1>()
> > > {...}, windows, supplier)
> > >   .mapValues(new ValueMapper() { ... })
> > >   .foreach(new ForeachAction, V2>() {... });
> > >
> > >
> > > and In VDeserializer (implements Deserializer) I am doing something
> > like
> > > this:
> > >
> > > public V deserialize(String paramString, byte[] paramArrayOfByte) {
> > > if (paramArrayOfByte == null) { return null;}
> > > V data = null;
> > > try {
> > > data = objectMapper.readValue(paramArrayOfByte, new
> > > TypeReference() {});
> > > } catch (Exception e) {
> > > e.printStackTrace();
> > > }
> > > return data;
> > > }
> > >
> > > So I am catching any exception that may happen when deserializing the
> > data.
> > >
> > > This is what third option suggest (if I am not mistaken).
> > >
> > > Please let me know given the pipeline we which option would be best and
> > how
> > > can we incorporate that in our pipeline.
> > >
> > > Also not exception is happening when reading from source topic which is
> > > "advice-stream", so looks like flow is not going to pipeline at all for
> > us
> > > to handle. It is terminating right at consumer poll.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll 
> > > wrote:
> > >
> > > > Could this be a corrupted message ("poison pill") in your topic?
> > > >
> > > > If so, take a look at
> > > > http://docs.confluent.io/current/streams/faq.html#
> > > >
> > > handling-corrupted-records-and-deserialization-errors-
> > poison-pill-messages
> > > >
> > > > FYI: We're currently investigating a more elegant way to address such
> > > > poison pill problems.  If you have feedback on that front, feel free
> to
> > > > share it with us. :-)
> > > >
> > > > -Michael
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal 
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > This is for first time we are getting a weird exception.
> > > > > After this the streams caches.
> > > > >
> > > > > Only work around is to manually seek and commit offset to a greater
> > > > number
> > > > > and we are needing this manual intervention again and again.
> > > > >
> > > > > Any idea what is causing it and how can we circumvent this.
> > > > >
> > > > > Note this error happens in both cases when 10.2 client or 10.1.1
> > client
> > > > > connect to kafka server 10.1.1
> > > > >
> > > > > So this does not looks like version issue.
> > > > >
> > > > > Also we have following setting
> > > > > message.max.bytes=513
> > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > > >
> > > > > Rest is all default and also increasing the value for
> > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > > > >
> > > > > Stack trace below.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > org.apache.kafka.common.errors.SerializationException: Error
> > > > deserializing
> > > > > key/value for partition advice-stream-6 at offset 45153795
> > > > > java.lang.IllegalArgumentException: null
> > > > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> > java:791)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.int

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Sachin Mittal
Well I try to read that offset via kafka-console-consumer.sh too and it
fails with same error.

So was wondering if I can apply any of the suggestion as per
http://docs.confluent.io/3.2.0/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

If there is any other was just to get the contents of that message it would
be helpful.

Thanks
Sachin


On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy  wrote:

> Hi Sachin,
>
> Have you tried firing up a consumer (non-streams), seeking to that offset
> on the topic and seeing what the message is? Might be easier to debug? Like
> you say, it is failing in the consumer.
> Thanks,
> Damian
>
> On Thu, 30 Mar 2017 at 10:35 Sachin Mittal  wrote:
>
> > I think I am following the third option.
> >
> > My pipeline is:
> >
> > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> >
> > builder.stream(Serdes.String(), serde, "advice-stream")
> >   .filter(new Predicate() { ...})
> >   .groupByKey()
> >   .aggregate(new Initializer() {...}, new Aggregator()
> > {...}, windows, supplier)
> >   .mapValues(new ValueMapper() { ... })
> >   .foreach(new ForeachAction, V2>() {... });
> >
> >
> > and In VDeserializer (implements Deserializer) I am doing something
> like
> > this:
> >
> > public V deserialize(String paramString, byte[] paramArrayOfByte) {
> > if (paramArrayOfByte == null) { return null;}
> > V data = null;
> > try {
> > data = objectMapper.readValue(paramArrayOfByte, new
> > TypeReference() {});
> > } catch (Exception e) {
> > e.printStackTrace();
> > }
> > return data;
> > }
> >
> > So I am catching any exception that may happen when deserializing the
> data.
> >
> > This is what third option suggest (if I am not mistaken).
> >
> > Please let me know given the pipeline we which option would be best and
> how
> > can we incorporate that in our pipeline.
> >
> > Also not exception is happening when reading from source topic which is
> > "advice-stream", so looks like flow is not going to pipeline at all for
> us
> > to handle. It is terminating right at consumer poll.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll 
> > wrote:
> >
> > > Could this be a corrupted message ("poison pill") in your topic?
> > >
> > > If so, take a look at
> > > http://docs.confluent.io/current/streams/faq.html#
> > >
> > handling-corrupted-records-and-deserialization-errors-
> poison-pill-messages
> > >
> > > FYI: We're currently investigating a more elegant way to address such
> > > poison pill problems.  If you have feedback on that front, feel free to
> > > share it with us. :-)
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal 
> > > wrote:
> > >
> > > > Hi,
> > > > This is for first time we are getting a weird exception.
> > > > After this the streams caches.
> > > >
> > > > Only work around is to manually seek and commit offset to a greater
> > > number
> > > > and we are needing this manual intervention again and again.
> > > >
> > > > Any idea what is causing it and how can we circumvent this.
> > > >
> > > > Note this error happens in both cases when 10.2 client or 10.1.1
> client
> > > > connect to kafka server 10.1.1
> > > >
> > > > So this does not looks like version issue.
> > > >
> > > > Also we have following setting
> > > > message.max.bytes=513
> > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > >
> > > > Rest is all default and also increasing the value for
> > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > > >
> > > > Stack trace below.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > org.apache.kafka.common.errors.SerializationException: Error
> > > deserializing
> > > > key/value for partition advice-stream-6 at offset 45153795
> > > > java.lang.IllegalArgumentException: null
> > > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> java:791)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > parseRecord(Fetcher.java:867)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:
> na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > KafkaConsumer.java:995)
> > > > ~

Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Damian Guy
Hi Sachin,

Have you tried firing up a consumer (non-streams), seeking to that offset
on the topic and seeing what the message is? Might be easier to debug? Like
you say, it is failing in the consumer.
Thanks,
Damian

On Thu, 30 Mar 2017 at 10:35 Sachin Mittal  wrote:

> I think I am following the third option.
>
> My pipeline is:
>
> serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
>
> builder.stream(Serdes.String(), serde, "advice-stream")
>   .filter(new Predicate() { ...})
>   .groupByKey()
>   .aggregate(new Initializer() {...}, new Aggregator()
> {...}, windows, supplier)
>   .mapValues(new ValueMapper() { ... })
>   .foreach(new ForeachAction, V2>() {... });
>
>
> and In VDeserializer (implements Deserializer) I am doing something like
> this:
>
> public V deserialize(String paramString, byte[] paramArrayOfByte) {
> if (paramArrayOfByte == null) { return null;}
> V data = null;
> try {
> data = objectMapper.readValue(paramArrayOfByte, new
> TypeReference() {});
> } catch (Exception e) {
> e.printStackTrace();
> }
> return data;
> }
>
> So I am catching any exception that may happen when deserializing the data.
>
> This is what third option suggest (if I am not mistaken).
>
> Please let me know given the pipeline we which option would be best and how
> can we incorporate that in our pipeline.
>
> Also not exception is happening when reading from source topic which is
> "advice-stream", so looks like flow is not going to pipeline at all for us
> to handle. It is terminating right at consumer poll.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll 
> wrote:
>
> > Could this be a corrupted message ("poison pill") in your topic?
> >
> > If so, take a look at
> > http://docs.confluent.io/current/streams/faq.html#
> >
> handling-corrupted-records-and-deserialization-errors-poison-pill-messages
> >
> > FYI: We're currently investigating a more elegant way to address such
> > poison pill problems.  If you have feedback on that front, feel free to
> > share it with us. :-)
> >
> > -Michael
> >
> >
> >
> >
> > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal 
> > wrote:
> >
> > > Hi,
> > > This is for first time we are getting a weird exception.
> > > After this the streams caches.
> > >
> > > Only work around is to manually seek and commit offset to a greater
> > number
> > > and we are needing this manual intervention again and again.
> > >
> > > Any idea what is causing it and how can we circumvent this.
> > >
> > > Note this error happens in both cases when 10.2 client or 10.1.1 client
> > > connect to kafka server 10.1.1
> > >
> > > So this does not looks like version issue.
> > >
> > > Also we have following setting
> > > message.max.bytes=513
> > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > >
> > > Rest is all default and also increasing the value for
> > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > >
> > > Stack trace below.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > org.apache.kafka.common.errors.SerializationException: Error
> > deserializing
> > > key/value for partition advice-stream-6 at offset 45153795
> > > java.lang.IllegalArgumentException: null
> > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > parseRecord(Fetcher.java:867)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:995)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:592)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >   at org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > > SNAPSHOT.jar:na]
> > >
> >
>


Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Sachin Mittal
I think I am following the third option.

My pipeline is:

serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());

builder.stream(Serdes.String(), serde, "advice-stream")
  .filter(new Predicate() { ...})
  .groupByKey()
  .aggregate(new Initializer() {...}, new Aggregator()
{...}, windows, supplier)
  .mapValues(new ValueMapper() { ... })
  .foreach(new ForeachAction, V2>() {... });


and In VDeserializer (implements Deserializer) I am doing something like
this:

public V deserialize(String paramString, byte[] paramArrayOfByte) {
if (paramArrayOfByte == null) { return null;}
V data = null;
try {
data = objectMapper.readValue(paramArrayOfByte, new
TypeReference() {});
} catch (Exception e) {
e.printStackTrace();
}
return data;
}

So I am catching any exception that may happen when deserializing the data.

This is what third option suggest (if I am not mistaken).

Please let me know given the pipeline we which option would be best and how
can we incorporate that in our pipeline.

Also not exception is happening when reading from source topic which is
"advice-stream", so looks like flow is not going to pipeline at all for us
to handle. It is terminating right at consumer poll.

Thanks
Sachin


On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll  wrote:

> Could this be a corrupted message ("poison pill") in your topic?
>
> If so, take a look at
> http://docs.confluent.io/current/streams/faq.html#
> handling-corrupted-records-and-deserialization-errors-poison-pill-messages
>
> FYI: We're currently investigating a more elegant way to address such
> poison pill problems.  If you have feedback on that front, feel free to
> share it with us. :-)
>
> -Michael
>
>
>
>
> On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal 
> wrote:
>
> > Hi,
> > This is for first time we are getting a weird exception.
> > After this the streams caches.
> >
> > Only work around is to manually seek and commit offset to a greater
> number
> > and we are needing this manual intervention again and again.
> >
> > Any idea what is causing it and how can we circumvent this.
> >
> > Note this error happens in both cases when 10.2 client or 10.1.1 client
> > connect to kafka server 10.1.1
> >
> > So this does not looks like version issue.
> >
> > Also we have following setting
> > message.max.bytes=513
> > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> >
> > Rest is all default and also increasing the value for
> > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> >
> > Stack trace below.
> >
> > Thanks
> > Sachin
> >
> >
> > org.apache.kafka.common.errors.SerializationException: Error
> deserializing
> > key/value for partition advice-stream-6 at offset 45153795
> > java.lang.IllegalArgumentException: null
> >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > parseRecord(Fetcher.java:867)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:592)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >   at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > SNAPSHOT.jar:na]
> >
>


Re: weird SerializationException when consumer is fetching and parsing record in streams application

2017-03-30 Thread Michael Noll
Could this be a corrupted message ("poison pill") in your topic?

If so, take a look at
http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

FYI: We're currently investigating a more elegant way to address such
poison pill problems.  If you have feedback on that front, feel free to
share it with us. :-)

-Michael




On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal  wrote:

> Hi,
> This is for first time we are getting a weird exception.
> After this the streams caches.
>
> Only work around is to manually seek and commit offset to a greater number
> and we are needing this manual intervention again and again.
>
> Any idea what is causing it and how can we circumvent this.
>
> Note this error happens in both cases when 10.2 client or 10.1.1 client
> connect to kafka server 10.1.1
>
> So this does not looks like version issue.
>
> Also we have following setting
> message.max.bytes=513
> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
>
> Rest is all default and also increasing the value for
> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
>
> Stack trace below.
>
> Thanks
> Sachin
>
>
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition advice-stream-6 at offset 45153795
> java.lang.IllegalArgumentException: null
>   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseRecord(Fetcher.java:867)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> SNAPSHOT.jar:na]
>