Guozhang,

Here is the snippet.

private Properties getProperties() {
    Properties p = new Properties();
    ...
    p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaConfig.getString("
streamThreads"));
    p.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
    ...
    return p;
  }

StreamsConfig streamsConfig = new StreamsConfig(getProperties())
KafkaStreams kafkaStreams = new KafkaStreams(streamBuilder.
build(),streamsConfig);

Srikanth

On Mon, Jan 29, 2018 at 11:10 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Srikanth,
>
> How did you set the LogAndContinueExceptionHandler in the configs? Could
> you copy the code snippet here?
>
> Guozhang
>
>
> On Sun, Jan 28, 2018 at 11:26 PM, Srikanth <srikanth...@gmail.com> wrote:
>
> > Kafka-streams version "1.0.0".
> >
> > Thanks,
> > Srikanth
> >
> > On Mon, Jan 29, 2018 at 12:23 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Hello Srikanth,
> > >
> > > Which version of Kafka are you using? I'd like to dig for that
> particular
> > > branch again.
> > >
> > > Guozhang
> > >
> > > On Sun, Jan 28, 2018 at 8:54 AM, Srikanth <srikanth...@gmail.com>
> wrote:
> > >
> > > > Guozhang,
> > > >
> > > > While I understand that this metric is meaningless when handler is
> set
> > to
> > > > FAIL, in my case I'm actually using LogAndContinueExceptionHandler.
> > > > In this case, app needs to report such occurrences. What I noticed is
> > > that
> > > > only skipped-records is set.
> > > > The granularity offered by skippedDueToDeserializationError is
> > missing.
> > > >
> > > > Srikanth
> > > >
> > > > On Fri, Jan 26, 2018 at 10:45 PM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Srikanth,
> > > > >
> > > > > Looked at the source code once again and discussing with other
> > > committer
> > > > I
> > > > > now remembered why we designed it that way: when you set the
> > > > > HandlerResponse to FAIL, it means that once a "poison record" is
> > > > received,
> > > > > stop the world by throwing this exception all the way up. And hence
> > at
> > > > that
> > > > > time the application would be stopped anyways so we would not need
> to
> > > > > record this metric.
> > > > >
> > > > > So in other words, I think it is rather a documentation improvement
> > > that
> > > > we
> > > > > should do.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Jan 26, 2018 at 8:56 AM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Helo Srikanth,
> > > > > >
> > > > > > Thanks for reporting this, as I checked the code
> > > > > skippedDueToDeserializati
> > > > > > onError is effectively only recorded when the
> > > > DeserializationHandlerResp
> > > > > > onse is not set to FAIL. I agree it is not exactly matching the
> > > > > > documentation's guidance, and will try to file a JIRA and fix it.
> > > > > >
> > > > > > As for skippedDueToDeserializationError and
> skipped-records-rate,
> > > > there
> > > > > > is an open JIRA discussing about this: https://issues.apache.
> > > > > > org/jira/browse/KAFKA-6376, just FYI.
> > > > > >
> > > > > >
> > > > > > Could you share on which version of Kafka did you observe this
> > issue?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Fri, Jan 26, 2018 at 6:30 AM, Srikanth <srikanth...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > >> Hello,
> > > > > >>
> > > > > >> As per doc when LogAndContinueExceptionHandler is used it will
> set
> > > > > >> skippedDueToDeserializationError-rate metric to indicate
> > > > > deserialization
> > > > > >> error.
> > > > > >> I notice that it is never set. Instead skipped-records-rate is
> > set.
> > > My
> > > > > >> understanding was that skipped-records-rate is set due to
> > timestamp
> > > > > >> extraction errors.
> > > > > >>
> > > > > >> Ex, I sent a few invalid records to a topic and was able to see
> > > > > exception
> > > > > >> during deserialization.
> > > > > >>
> > > > > >> org.apache.kafka.common.errors.SerializationException: Error
> > > > > >> deserializing
> > > > > >> Avro message for id -1
> > > > > >> Caused by: org.apache.kafka.common.
> errors.SerializationException:
> > > > > Unknown
> > > > > >> magic byte!
> > > > > >> 18/01/24 06:50:09 WARN StreamThread: Exception caught during
> > > > > >> Deserialization, taskId: 0_0, topic: docker.event.1, partition:
> 0,
> > > > > offset:
> > > > > >> 3764
> > > > > >>
> > > > > >> These incremented skipped-records-[rate|total].
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Srikanth
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to