Sachin,

We are discussing how to work around KAFKA-4740 for poison pill records:

https://issues.apache.org/jira/browse/KAFKA-5157

And Please share your scenario and your opinions on the solution there.


Guozhang

On Tue, May 16, 2017 at 9:50 PM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Folks is there any updated on
> https://issues.apache.org/jira/browse/KAFKA-4740.
> This is now so far only pressing issue for our streams application.
>
> It happens from time to time and so far our only solution is to increase
> the offset of the group for that partition beyond the offset that caused
> this issue.
> Other thing I have noticed is that such (poison pill) messages then to
> bunch together. So usually we need to increase the offset by say 10000 to
> get past the issue.
> So basically we loose processing of these many messages. If we increase the
> offset by just one then we again see another such offset few offsets higher
> and so on.
>
> So in order to prevent multiple manual restart of the streams application
> we simply increase the offset by 10000. Then streams application works
> uninterrupted for few months and one fine day we again see such messages.
>
> As the bug is critical shouldn't we get some fix or workaround in next
> release.
>
> Thanks
> Sachin
>
>
>
>
>
> On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
> > Sachin,
> >
> > there's a JIRA that seems related to what you're seeing:
> > https://issues.apache.org/jira/browse/KAFKA-4740
> >
> > Perhaps you could check the above and report back?
> >
> > -Michael
> >
> >
> >
> >
> > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> > > Hmm, I re-read the stacktrace again. It does look like the value-side
> > > being the culprit (as Sachin suggested earlier).
> > >
> > > -Michael
> > >
> > >
> > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mich...@confluent.io>
> > > wrote:
> > >
> > >> Sachin,
> > >>
> > >> you have this line:
> > >>
> > >> > builder.stream(Serdes.String(), serde, "advice-stream")
> > >>
> > >> Could the problem be that not the record values are causing the
> problem
> > >> -- because your value deserializer does try-catch any such errors --
> but
> > >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> > does
> > >> not try-catch deserialization errors, and from a quick look at the
> > source
> > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> > >> error above ("Error deserializing key/value for partition..."), and
> the
> > >> Fetcher is swallowing the more specific SerializationException of
> > >> `String.Serdes()` (but it will include the original
> exception/Throwable
> > in
> > >> its own SerializationException).
> > >>
> > >> -Michael
> > >>
> > >>
> > >>
> > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sjmit...@gmail.com>
> > >> wrote:
> > >>
> > >>> My streams application does run in debug mode only.
> > >>> Also I have checked the code around these lines
> > >>>
> > >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> java:791)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> > >>> ord(Fetcher.java:867)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>
> > >>> I don't see any log statement which will give me more information.
> > >>>
> > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> > >>>
> > >>> The issue is happening at this line and perhaps handling the
> exception
> > >>> and
> > >>> setting the value to be null may be better options.
> > >>> Yes at client side nothing can be done because exception is happening
> > >>> before this.valueDeserializer.deserialize can be called.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <damian....@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> > >>> message
> > >>> > has already been received into Streams.
> > >>> > You could create a simple app that uses the Consumer, seeks to the
> > >>> offset,
> > >>> > and tries to read the message. If you did this in debug mode you
> > might
> > >>> find
> > >>> > out some more information.
> > >>> >
> > >>> >
> > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> > >>> >
> > >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> > and
> > >>> it
> > >>> > > fails with same error.
> > >>> > >
> > >>> > > So was wondering if I can apply any of the suggestion as per
> > >>> > >
> > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > >>> > >
> > >>> > > If there is any other was just to get the contents of that
> message
> > it
> > >>> > would
> > >>> > > be helpful.
> > >>> > >
> > >>> > > Thanks
> > >>> > > Sachin
> > >>> > >
> > >>> > >
> > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <
> damian....@gmail.com>
> > >>> > wrote:
> > >>> > >
> > >>> > > > Hi Sachin,
> > >>> > > >
> > >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> > that
> > >>> > offset
> > >>> > > > on the topic and seeing what the message is? Might be easier to
> > >>> debug?
> > >>> > > Like
> > >>> > > > you say, it is failing in the consumer.
> > >>> > > > Thanks,
> > >>> > > > Damian
> > >>> > > >
> > >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmit...@gmail.com
> >
> > >>> wrote:
> > >>> > > >
> > >>> > > > > I think I am following the third option.
> > >>> > > > >
> > >>> > > > > My pipeline is:
> > >>> > > > >
> > >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer
> > ());
> > >>> > > > >
> > >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > >>> > > > >   .filter(new Predicate<String, V>() { ...})
> > >>> > > > >   .groupByKey()
> > >>> > > > >   .aggregate(new Initializer<V1>() {...}, new
> > Aggregator<String,
> > >>> V,
> > >>> > > V1>()
> > >>> > > > > {...}, windows, supplier)
> > >>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> > >>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {...
> });
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
> > >>> > something
> > >>> > > > like
> > >>> > > > > this:
> > >>> > > > >
> > >>> > > > >     public V deserialize(String paramString, byte[]
> > >>> > paramArrayOfByte) {
> > >>> > > > >         if (paramArrayOfByte == null) { return null;}
> > >>> > > > >         V data = null;
> > >>> > > > >         try {
> > >>> > > > >             data = objectMapper.readValue(paramArrayOfByte,
> > new
> > >>> > > > > TypeReference<V>() {});
> > >>> > > > >         } catch (Exception e) {
> > >>> > > > >             e.printStackTrace();
> > >>> > > > >         }
> > >>> > > > >         return data;
> > >>> > > > >     }
> > >>> > > > >
> > >>> > > > > So I am catching any exception that may happen when
> > >>> deserializing the
> > >>> > > > data.
> > >>> > > > >
> > >>> > > > > This is what third option suggest (if I am not mistaken).
> > >>> > > > >
> > >>> > > > > Please let me know given the pipeline we which option would
> be
> > >>> best
> > >>> > and
> > >>> > > > how
> > >>> > > > > can we incorporate that in our pipeline.
> > >>> > > > >
> > >>> > > > > Also not exception is happening when reading from source
> topic
> > >>> which
> > >>> > is
> > >>> > > > > "advice-stream", so looks like flow is not going to pipeline
> at
> > >>> all
> > >>> > for
> > >>> > > > us
> > >>> > > > > to handle. It is terminating right at consumer poll.
> > >>> > > > >
> > >>> > > > > Thanks
> > >>> > > > > Sachin
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> > >>> mich...@confluent.io>
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > > > Could this be a corrupted message ("poison pill") in your
> > >>> topic?
> > >>> > > > > >
> > >>> > > > > > If so, take a look at
> > >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> > >>> > > > > >
> > >>> > > > > handling-corrupted-records-and-deserialization-errors-
> > >>> > > > poison-pill-messages
> > >>> > > > > >
> > >>> > > > > > FYI: We're currently investigating a more elegant way to
> > >>> address
> > >>> > such
> > >>> > > > > > poison pill problems.  If you have feedback on that front,
> > feel
> > >>> > free
> > >>> > > to
> > >>> > > > > > share it with us. :-)
> > >>> > > > > >
> > >>> > > > > > -Michael
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> > >>> > sjmit...@gmail.com>
> > >>> > > > > > wrote:
> > >>> > > > > >
> > >>> > > > > > > Hi,
> > >>> > > > > > > This is for first time we are getting a weird exception.
> > >>> > > > > > > After this the streams caches.
> > >>> > > > > > >
> > >>> > > > > > > Only work around is to manually seek and commit offset
> to a
> > >>> > greater
> > >>> > > > > > number
> > >>> > > > > > > and we are needing this manual intervention again and
> > again.
> > >>> > > > > > >
> > >>> > > > > > > Any idea what is causing it and how can we circumvent
> this.
> > >>> > > > > > >
> > >>> > > > > > > Note this error happens in both cases when 10.2 client or
> > >>> 10.1.1
> > >>> > > > client
> > >>> > > > > > > connect to kafka server 10.1.1
> > >>> > > > > > >
> > >>> > > > > > > So this does not looks like version issue.
> > >>> > > > > > >
> > >>> > > > > > > Also we have following setting
> > >>> > > > > > > message.max.bytes=5000013
> > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> "5048576"
> > >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > >>> > > > > > >
> > >>> > > > > > > Rest is all default and also increasing the value for
> > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
> > >>> help.
> > >>> > > > > > >
> > >>> > > > > > > Stack trace below.
> > >>> > > > > > >
> > >>> > > > > > > Thanks
> > >>> > > > > > > Sachin
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> > Error
> > >>> > > > > > deserializing
> > >>> > > > > > > key/value for partition advice-stream-6 at offset
> 45153795
> > >>> > > > > > > java.lang.IllegalArgumentException: null
> > >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> > >>> ~[na:1.8.0_122-ea]
> > >>> > > > > > >   at org.apache.kafka.common.utils.
> > >>> Utils.sizeDelimited(Utils.
> > >>> > > > java:791)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> > >>> > java:268)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > parseRecord(Fetcher.java:867)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> > >>> > ~[kafka-clients-0.10.2.0.jar:
> > >>> > > > na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > fetchedRecords(Fetcher.java:473)
> > >>> ~[kafka-clients-0.10.2.0.jar:
> > >>> > na]
> > >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> > >>> ~[kafka-clients-0.10.2.0.jar:
> > >>> > na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.KafkaConsumer.poll(
> > >>> > > > > > > KafkaConsumer.java:995)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > >>> > > > StreamThread.runLoop(
> > >>> > > > > > > StreamThread.java:592)
> > >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > >>> > > > > > > StreamThread.run(StreamThread.java:378)
> > >>> > ~[kafka-streams-0.10.2.1-
> > >>> > > > > > > SNAPSHOT.jar:na]
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >>
> > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to