Sachin,

there's a JIRA that seems related to what you're seeing:
https://issues.apache.org/jira/browse/KAFKA-4740

Perhaps you could check the above and report back?

-Michael




On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mich...@confluent.io> wrote:

> Hmm, I re-read the stacktrace again. It does look like the value-side
> being the culprit (as Sachin suggested earlier).
>
> -Michael
>
>
> On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
>> Sachin,
>>
>> you have this line:
>>
>> > builder.stream(Serdes.String(), serde, "advice-stream")
>>
>> Could the problem be that not the record values are causing the problem
>> -- because your value deserializer does try-catch any such errors -- but
>> that the record *keys* are malformed?  The built-in `Serdes.String()` does
>> not try-catch deserialization errors, and from a quick look at the source
>> it seems that the `Fetcher` class (clients/src/main/java/org/apa
>> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
>> error above ("Error deserializing key/value for partition..."), and the
>> Fetcher is swallowing the more specific SerializationException of
>> `String.Serdes()` (but it will include the original exception/Throwable in
>> its own SerializationException).
>>
>> -Michael
>>
>>
>>
>> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sjmit...@gmail.com>
>> wrote:
>>
>>> My streams application does run in debug mode only.
>>> Also I have checked the code around these lines
>>>
>>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
>>> ord(Fetcher.java:867)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>
>>> I don't see any log statement which will give me more information.
>>>
>>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
>>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>>>
>>> The issue is happening at this line and perhaps handling the exception
>>> and
>>> setting the value to be null may be better options.
>>> Yes at client side nothing can be done because exception is happening
>>> before this.valueDeserializer.deserialize can be called.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <damian....@gmail.com>
>>> wrote:
>>>
>>> > The suggestions in that FAQ won't help as it is too late, i.e., the
>>> message
>>> > has already been received into Streams.
>>> > You could create a simple app that uses the Consumer, seeks to the
>>> offset,
>>> > and tries to read the message. If you did this in debug mode you might
>>> find
>>> > out some more information.
>>> >
>>> >
>>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sjmit...@gmail.com> wrote:
>>> >
>>> > > Well I try to read that offset via kafka-console-consumer.sh too and
>>> it
>>> > > fails with same error.
>>> > >
>>> > > So was wondering if I can apply any of the suggestion as per
>>> > >
>>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
>>> > corrupted-records-and-deserialization-errors-poison-pill-messages
>>> > >
>>> > > If there is any other was just to get the contents of that message it
>>> > would
>>> > > be helpful.
>>> > >
>>> > > Thanks
>>> > > Sachin
>>> > >
>>> > >
>>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <damian....@gmail.com>
>>> > wrote:
>>> > >
>>> > > > Hi Sachin,
>>> > > >
>>> > > > Have you tried firing up a consumer (non-streams), seeking to that
>>> > offset
>>> > > > on the topic and seeing what the message is? Might be easier to
>>> debug?
>>> > > Like
>>> > > > you say, it is failing in the consumer.
>>> > > > Thanks,
>>> > > > Damian
>>> > > >
>>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmit...@gmail.com>
>>> wrote:
>>> > > >
>>> > > > > I think I am following the third option.
>>> > > > >
>>> > > > > My pipeline is:
>>> > > > >
>>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
>>> > > > >
>>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
>>> > > > >   .filter(new Predicate<String, V>() { ...})
>>> > > > >   .groupByKey()
>>> > > > >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String,
>>> V,
>>> > > V1>()
>>> > > > > {...}, windows, supplier)
>>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
>>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
>>> > > > >
>>> > > > >
>>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
>>> > something
>>> > > > like
>>> > > > > this:
>>> > > > >
>>> > > > >     public V deserialize(String paramString, byte[]
>>> > paramArrayOfByte) {
>>> > > > >         if (paramArrayOfByte == null) { return null;}
>>> > > > >         V data = null;
>>> > > > >         try {
>>> > > > >             data = objectMapper.readValue(paramArrayOfByte, new
>>> > > > > TypeReference<V>() {});
>>> > > > >         } catch (Exception e) {
>>> > > > >             e.printStackTrace();
>>> > > > >         }
>>> > > > >         return data;
>>> > > > >     }
>>> > > > >
>>> > > > > So I am catching any exception that may happen when
>>> deserializing the
>>> > > > data.
>>> > > > >
>>> > > > > This is what third option suggest (if I am not mistaken).
>>> > > > >
>>> > > > > Please let me know given the pipeline we which option would be
>>> best
>>> > and
>>> > > > how
>>> > > > > can we incorporate that in our pipeline.
>>> > > > >
>>> > > > > Also not exception is happening when reading from source topic
>>> which
>>> > is
>>> > > > > "advice-stream", so looks like flow is not going to pipeline at
>>> all
>>> > for
>>> > > > us
>>> > > > > to handle. It is terminating right at consumer poll.
>>> > > > >
>>> > > > > Thanks
>>> > > > > Sachin
>>> > > > >
>>> > > > >
>>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
>>> mich...@confluent.io>
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Could this be a corrupted message ("poison pill") in your
>>> topic?
>>> > > > > >
>>> > > > > > If so, take a look at
>>> > > > > > http://docs.confluent.io/current/streams/faq.html#
>>> > > > > >
>>> > > > > handling-corrupted-records-and-deserialization-errors-
>>> > > > poison-pill-messages
>>> > > > > >
>>> > > > > > FYI: We're currently investigating a more elegant way to
>>> address
>>> > such
>>> > > > > > poison pill problems.  If you have feedback on that front, feel
>>> > free
>>> > > to
>>> > > > > > share it with us. :-)
>>> > > > > >
>>> > > > > > -Michael
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
>>> > sjmit...@gmail.com>
>>> > > > > > wrote:
>>> > > > > >
>>> > > > > > > Hi,
>>> > > > > > > This is for first time we are getting a weird exception.
>>> > > > > > > After this the streams caches.
>>> > > > > > >
>>> > > > > > > Only work around is to manually seek and commit offset to a
>>> > greater
>>> > > > > > number
>>> > > > > > > and we are needing this manual intervention again and again.
>>> > > > > > >
>>> > > > > > > Any idea what is causing it and how can we circumvent this.
>>> > > > > > >
>>> > > > > > > Note this error happens in both cases when 10.2 client or
>>> 10.1.1
>>> > > > client
>>> > > > > > > connect to kafka server 10.1.1
>>> > > > > > >
>>> > > > > > > So this does not looks like version issue.
>>> > > > > > >
>>> > > > > > > Also we have following setting
>>> > > > > > > message.max.bytes=5000013
>>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
>>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
>>> > > > > > >
>>> > > > > > > Rest is all default and also increasing the value for
>>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
>>> help.
>>> > > > > > >
>>> > > > > > > Stack trace below.
>>> > > > > > >
>>> > > > > > > Thanks
>>> > > > > > > Sachin
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > org.apache.kafka.common.errors.SerializationException: Error
>>> > > > > > deserializing
>>> > > > > > > key/value for partition advice-stream-6 at offset 45153795
>>> > > > > > > java.lang.IllegalArgumentException: null
>>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
>>> ~[na:1.8.0_122-ea]
>>> > > > > > >   at org.apache.kafka.common.utils.
>>> Utils.sizeDelimited(Utils.
>>> > > > java:791)
>>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
>>> > java:268)
>>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> > > > > > > parseRecord(Fetcher.java:867)
>>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> > > > > > > parseCompletedFetch(Fetcher.java:775)
>>> > ~[kafka-clients-0.10.2.0.jar:
>>> > > > na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> > > > > > > fetchedRecords(Fetcher.java:473)
>>> ~[kafka-clients-0.10.2.0.jar:
>>> > na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> > > > > > > pollOnce(KafkaConsumer.java:1062)
>>> ~[kafka-clients-0.10.2.0.jar:
>>> > na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>> > > > > > > KafkaConsumer.java:995)
>>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>>> > > > > > >   at org.apache.kafka.streams.processor.internals.
>>> > > > StreamThread.runLoop(
>>> > > > > > > StreamThread.java:592)
>>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>> > > > > > >   at org.apache.kafka.streams.processor.internals.
>>> > > > > > > StreamThread.run(StreamThread.java:378)
>>> > ~[kafka-streams-0.10.2.1-
>>> > > > > > > SNAPSHOT.jar:na]
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>>
>
>

Reply via email to