Hmm, I re-read the stacktrace again. It does look like the value-side being the culprit (as Sachin suggested earlier).
-Michael On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mich...@confluent.io> wrote: > Sachin, > > you have this line: > > > builder.stream(Serdes.String(), serde, "advice-stream") > > Could the problem be that not the record values are causing the problem -- > because your value deserializer does try-catch any such errors -- but that > the record *keys* are malformed? The built-in `Serdes.String()` does not > try-catch deserialization errors, and from a quick look at the source it > seems that the `Fetcher` class (clients/src/main/java/org/ > apache/kafka/clients/consumer/internals/Fetcher.java) is throwing your > error above ("Error deserializing key/value for partition..."), and the > Fetcher is swallowing the more specific SerializationException of > `String.Serdes()` (but it will include the original exception/Throwable in > its own SerializationException). > > -Michael > > > > On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sjmit...@gmail.com> wrote: > >> My streams application does run in debug mode only. >> Also I have checked the code around these lines >> >> at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791) >> ~[kafka-clients-0.10.2.0.jar:na] >> at org.apache.kafka.common.record.Record.value(Record.java:268) >> ~[kafka-clients-0.10.2.0.jar:na] >> at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec >> ord(Fetcher.java:867) >> ~[kafka-clients-0.10.2.0.jar:na] >> >> I don't see any log statement which will give me more information. >> >> https://github.com/apache/kafka/blob/0.10.2/clients/src/main >> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867 >> >> The issue is happening at this line and perhaps handling the exception and >> setting the value to be null may be better options. >> Yes at client side nothing can be done because exception is happening >> before this.valueDeserializer.deserialize can be called. >> >> Thanks >> Sachin >> >> >> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <damian....@gmail.com> wrote: >> >> > The suggestions in that FAQ won't help as it is too late, i.e., the >> message >> > has already been received into Streams. >> > You could create a simple app that uses the Consumer, seeks to the >> offset, >> > and tries to read the message. If you did this in debug mode you might >> find >> > out some more information. >> > >> > >> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sjmit...@gmail.com> wrote: >> > >> > > Well I try to read that offset via kafka-console-consumer.sh too and >> it >> > > fails with same error. >> > > >> > > So was wondering if I can apply any of the suggestion as per >> > > >> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling- >> > corrupted-records-and-deserialization-errors-poison-pill-messages >> > > >> > > If there is any other was just to get the contents of that message it >> > would >> > > be helpful. >> > > >> > > Thanks >> > > Sachin >> > > >> > > >> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <damian....@gmail.com> >> > wrote: >> > > >> > > > Hi Sachin, >> > > > >> > > > Have you tried firing up a consumer (non-streams), seeking to that >> > offset >> > > > on the topic and seeing what the message is? Might be easier to >> debug? >> > > Like >> > > > you say, it is failing in the consumer. >> > > > Thanks, >> > > > Damian >> > > > >> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmit...@gmail.com> >> wrote: >> > > > >> > > > > I think I am following the third option. >> > > > > >> > > > > My pipeline is: >> > > > > >> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ()); >> > > > > >> > > > > builder.stream(Serdes.String(), serde, "advice-stream") >> > > > > .filter(new Predicate<String, V>() { ...}) >> > > > > .groupByKey() >> > > > > .aggregate(new Initializer<V1>() {...}, new Aggregator<String, >> V, >> > > V1>() >> > > > > {...}, windows, supplier) >> > > > > .mapValues(new ValueMapper<V1, V2>() { ... }) >> > > > > .foreach(new ForeachAction<Windowed<String>, V2>() {... }); >> > > > > >> > > > > >> > > > > and In VDeserializer (implements Deserializer<V>) I am doing >> > something >> > > > like >> > > > > this: >> > > > > >> > > > > public V deserialize(String paramString, byte[] >> > paramArrayOfByte) { >> > > > > if (paramArrayOfByte == null) { return null;} >> > > > > V data = null; >> > > > > try { >> > > > > data = objectMapper.readValue(paramArrayOfByte, new >> > > > > TypeReference<V>() {}); >> > > > > } catch (Exception e) { >> > > > > e.printStackTrace(); >> > > > > } >> > > > > return data; >> > > > > } >> > > > > >> > > > > So I am catching any exception that may happen when deserializing >> the >> > > > data. >> > > > > >> > > > > This is what third option suggest (if I am not mistaken). >> > > > > >> > > > > Please let me know given the pipeline we which option would be >> best >> > and >> > > > how >> > > > > can we incorporate that in our pipeline. >> > > > > >> > > > > Also not exception is happening when reading from source topic >> which >> > is >> > > > > "advice-stream", so looks like flow is not going to pipeline at >> all >> > for >> > > > us >> > > > > to handle. It is terminating right at consumer poll. >> > > > > >> > > > > Thanks >> > > > > Sachin >> > > > > >> > > > > >> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll < >> mich...@confluent.io> >> > > > > wrote: >> > > > > >> > > > > > Could this be a corrupted message ("poison pill") in your topic? >> > > > > > >> > > > > > If so, take a look at >> > > > > > http://docs.confluent.io/current/streams/faq.html# >> > > > > > >> > > > > handling-corrupted-records-and-deserialization-errors- >> > > > poison-pill-messages >> > > > > > >> > > > > > FYI: We're currently investigating a more elegant way to address >> > such >> > > > > > poison pill problems. If you have feedback on that front, feel >> > free >> > > to >> > > > > > share it with us. :-) >> > > > > > >> > > > > > -Michael >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal < >> > sjmit...@gmail.com> >> > > > > > wrote: >> > > > > > >> > > > > > > Hi, >> > > > > > > This is for first time we are getting a weird exception. >> > > > > > > After this the streams caches. >> > > > > > > >> > > > > > > Only work around is to manually seek and commit offset to a >> > greater >> > > > > > number >> > > > > > > and we are needing this manual intervention again and again. >> > > > > > > >> > > > > > > Any idea what is causing it and how can we circumvent this. >> > > > > > > >> > > > > > > Note this error happens in both cases when 10.2 client or >> 10.1.1 >> > > > client >> > > > > > > connect to kafka server 10.1.1 >> > > > > > > >> > > > > > > So this does not looks like version issue. >> > > > > > > >> > > > > > > Also we have following setting >> > > > > > > message.max.bytes=5000013 >> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576" >> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576" >> > > > > > > >> > > > > > > Rest is all default and also increasing the value for >> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help. >> > > > > > > >> > > > > > > Stack trace below. >> > > > > > > >> > > > > > > Thanks >> > > > > > > Sachin >> > > > > > > >> > > > > > > >> > > > > > > org.apache.kafka.common.errors.SerializationException: Error >> > > > > > deserializing >> > > > > > > key/value for partition advice-stream-6 at offset 45153795 >> > > > > > > java.lang.IllegalArgumentException: null >> > > > > > > at java.nio.Buffer.limit(Buffer.java:275) >> ~[na:1.8.0_122-ea] >> > > > > > > at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils. >> > > > java:791) >> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] >> > > > > > > at org.apache.kafka.common.record.Record.value(Record. >> > java:268) >> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] >> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. >> > > > > > > parseRecord(Fetcher.java:867) >> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] >> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. >> > > > > > > parseCompletedFetch(Fetcher.java:775) >> > ~[kafka-clients-0.10.2.0.jar: >> > > > na] >> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. >> > > > > > > fetchedRecords(Fetcher.java:473) >> ~[kafka-clients-0.10.2.0.jar: >> > na] >> > > > > > > at org.apache.kafka.clients.consumer.KafkaConsumer. >> > > > > > > pollOnce(KafkaConsumer.java:1062) >> ~[kafka-clients-0.10.2.0.jar: >> > na] >> > > > > > > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> > > > > > > KafkaConsumer.java:995) >> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] >> > > > > > > at org.apache.kafka.streams.processor.internals. >> > > > StreamThread.runLoop( >> > > > > > > StreamThread.java:592) >> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> > > > > > > at org.apache.kafka.streams.processor.internals. >> > > > > > > StreamThread.run(StreamThread.java:378) >> > ~[kafka-streams-0.10.2.1- >> > > > > > > SNAPSHOT.jar:na] >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > > >