I think I am following the third option.

My pipeline is:

serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());

builder.stream(Serdes.String(), serde, "advice-stream")
  .filter(new Predicate<String, V>() { ...})
  .groupByKey()
  .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V, V1>()
{...}, windows, supplier)
  .mapValues(new ValueMapper<V1, V2>() { ... })
  .foreach(new ForeachAction<Windowed<String>, V2>() {... });


and In VDeserializer (implements Deserializer<V>) I am doing something like
this:

    public V deserialize(String paramString, byte[] paramArrayOfByte) {
        if (paramArrayOfByte == null) { return null;}
        V data = null;
        try {
            data = objectMapper.readValue(paramArrayOfByte, new
TypeReference<V>() {});
        } catch (Exception e) {
            e.printStackTrace();
        }
        return data;
    }

So I am catching any exception that may happen when deserializing the data.

This is what third option suggest (if I am not mistaken).

Please let me know given the pipeline we which option would be best and how
can we incorporate that in our pipeline.

Also not exception is happening when reading from source topic which is
"advice-stream", so looks like flow is not going to pipeline at all for us
to handle. It is terminating right at consumer poll.

Thanks
Sachin


On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <mich...@confluent.io> wrote:

> Could this be a corrupted message ("poison pill") in your topic?
>
> If so, take a look at
> http://docs.confluent.io/current/streams/faq.html#
> handling-corrupted-records-and-deserialization-errors-poison-pill-messages
>
> FYI: We're currently investigating a more elegant way to address such
> poison pill problems.  If you have feedback on that front, feel free to
> share it with us. :-)
>
> -Michael
>
>
>
>
> On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sjmit...@gmail.com>
> wrote:
>
> > Hi,
> > This is for first time we are getting a weird exception.
> > After this the streams caches.
> >
> > Only work around is to manually seek and commit offset to a greater
> number
> > and we are needing this manual intervention again and again.
> >
> > Any idea what is causing it and how can we circumvent this.
> >
> > Note this error happens in both cases when 10.2 client or 10.1.1 client
> > connect to kafka server 10.1.1
> >
> > So this does not looks like version issue.
> >
> > Also we have following setting
> > message.max.bytes=5000013
> > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> >
> > Rest is all default and also increasing the value for
> > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> >
> > Stack trace below.
> >
> > Thanks
> > Sachin
> >
> >
> > org.apache.kafka.common.errors.SerializationException: Error
> deserializing
> > key/value for partition advice-stream-6 at offset 45153795
> > java.lang.IllegalArgumentException: null
> >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > parseRecord(Fetcher.java:867)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:592)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >   at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > SNAPSHOT.jar:na]
> >
>

Reply via email to