Sachin, there's a JIRA that seems related to what you're seeing: https://issues.apache.org/jira/browse/KAFKA-4740
Perhaps you could check the above and report back? -Michael On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <[email protected]> wrote: > Hmm, I re-read the stacktrace again. It does look like the value-side > being the culprit (as Sachin suggested earlier). > > -Michael > > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <[email protected]> > wrote: > >> Sachin, >> >> you have this line: >> >> > builder.stream(Serdes.String(), serde, "advice-stream") >> >> Could the problem be that not the record values are causing the problem >> -- because your value deserializer does try-catch any such errors -- but >> that the record *keys* are malformed? The built-in `Serdes.String()` does >> not try-catch deserialization errors, and from a quick look at the source >> it seems that the `Fetcher` class (clients/src/main/java/org/apa >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your >> error above ("Error deserializing key/value for partition..."), and the >> Fetcher is swallowing the more specific SerializationException of >> `String.Serdes()` (but it will include the original exception/Throwable in >> its own SerializationException). >> >> -Michael >> >> >> >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <[email protected]> >> wrote: >> >>> My streams application does run in debug mode only. >>> Also I have checked the code around these lines >>> >>> at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791) >>> ~[kafka-clients-0.10.2.0.jar:na] >>> at org.apache.kafka.common.record.Record.value(Record.java:268) >>> ~[kafka-clients-0.10.2.0.jar:na] >>> at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec >>> ord(Fetcher.java:867) >>> ~[kafka-clients-0.10.2.0.jar:na] >>> >>> I don't see any log statement which will give me more information. >>> >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867 >>> >>> The issue is happening at this line and perhaps handling the exception >>> and >>> setting the value to be null may be better options. >>> Yes at client side nothing can be done because exception is happening >>> before this.valueDeserializer.deserialize can be called. >>> >>> Thanks >>> Sachin >>> >>> >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <[email protected]> >>> wrote: >>> >>> > The suggestions in that FAQ won't help as it is too late, i.e., the >>> message >>> > has already been received into Streams. >>> > You could create a simple app that uses the Consumer, seeks to the >>> offset, >>> > and tries to read the message. If you did this in debug mode you might >>> find >>> > out some more information. >>> > >>> > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <[email protected]> wrote: >>> > >>> > > Well I try to read that offset via kafka-console-consumer.sh too and >>> it >>> > > fails with same error. >>> > > >>> > > So was wondering if I can apply any of the suggestion as per >>> > > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling- >>> > corrupted-records-and-deserialization-errors-poison-pill-messages >>> > > >>> > > If there is any other was just to get the contents of that message it >>> > would >>> > > be helpful. >>> > > >>> > > Thanks >>> > > Sachin >>> > > >>> > > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <[email protected]> >>> > wrote: >>> > > >>> > > > Hi Sachin, >>> > > > >>> > > > Have you tried firing up a consumer (non-streams), seeking to that >>> > offset >>> > > > on the topic and seeing what the message is? Might be easier to >>> debug? >>> > > Like >>> > > > you say, it is failing in the consumer. >>> > > > Thanks, >>> > > > Damian >>> > > > >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <[email protected]> >>> wrote: >>> > > > >>> > > > > I think I am following the third option. >>> > > > > >>> > > > > My pipeline is: >>> > > > > >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ()); >>> > > > > >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream") >>> > > > > .filter(new Predicate<String, V>() { ...}) >>> > > > > .groupByKey() >>> > > > > .aggregate(new Initializer<V1>() {...}, new Aggregator<String, >>> V, >>> > > V1>() >>> > > > > {...}, windows, supplier) >>> > > > > .mapValues(new ValueMapper<V1, V2>() { ... }) >>> > > > > .foreach(new ForeachAction<Windowed<String>, V2>() {... }); >>> > > > > >>> > > > > >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing >>> > something >>> > > > like >>> > > > > this: >>> > > > > >>> > > > > public V deserialize(String paramString, byte[] >>> > paramArrayOfByte) { >>> > > > > if (paramArrayOfByte == null) { return null;} >>> > > > > V data = null; >>> > > > > try { >>> > > > > data = objectMapper.readValue(paramArrayOfByte, new >>> > > > > TypeReference<V>() {}); >>> > > > > } catch (Exception e) { >>> > > > > e.printStackTrace(); >>> > > > > } >>> > > > > return data; >>> > > > > } >>> > > > > >>> > > > > So I am catching any exception that may happen when >>> deserializing the >>> > > > data. >>> > > > > >>> > > > > This is what third option suggest (if I am not mistaken). >>> > > > > >>> > > > > Please let me know given the pipeline we which option would be >>> best >>> > and >>> > > > how >>> > > > > can we incorporate that in our pipeline. >>> > > > > >>> > > > > Also not exception is happening when reading from source topic >>> which >>> > is >>> > > > > "advice-stream", so looks like flow is not going to pipeline at >>> all >>> > for >>> > > > us >>> > > > > to handle. It is terminating right at consumer poll. >>> > > > > >>> > > > > Thanks >>> > > > > Sachin >>> > > > > >>> > > > > >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll < >>> [email protected]> >>> > > > > wrote: >>> > > > > >>> > > > > > Could this be a corrupted message ("poison pill") in your >>> topic? >>> > > > > > >>> > > > > > If so, take a look at >>> > > > > > http://docs.confluent.io/current/streams/faq.html# >>> > > > > > >>> > > > > handling-corrupted-records-and-deserialization-errors- >>> > > > poison-pill-messages >>> > > > > > >>> > > > > > FYI: We're currently investigating a more elegant way to >>> address >>> > such >>> > > > > > poison pill problems. If you have feedback on that front, feel >>> > free >>> > > to >>> > > > > > share it with us. :-) >>> > > > > > >>> > > > > > -Michael >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal < >>> > [email protected]> >>> > > > > > wrote: >>> > > > > > >>> > > > > > > Hi, >>> > > > > > > This is for first time we are getting a weird exception. >>> > > > > > > After this the streams caches. >>> > > > > > > >>> > > > > > > Only work around is to manually seek and commit offset to a >>> > greater >>> > > > > > number >>> > > > > > > and we are needing this manual intervention again and again. >>> > > > > > > >>> > > > > > > Any idea what is causing it and how can we circumvent this. >>> > > > > > > >>> > > > > > > Note this error happens in both cases when 10.2 client or >>> 10.1.1 >>> > > > client >>> > > > > > > connect to kafka server 10.1.1 >>> > > > > > > >>> > > > > > > So this does not looks like version issue. >>> > > > > > > >>> > > > > > > Also we have following setting >>> > > > > > > message.max.bytes=5000013 >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576" >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576" >>> > > > > > > >>> > > > > > > Rest is all default and also increasing the value for >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not >>> help. >>> > > > > > > >>> > > > > > > Stack trace below. >>> > > > > > > >>> > > > > > > Thanks >>> > > > > > > Sachin >>> > > > > > > >>> > > > > > > >>> > > > > > > org.apache.kafka.common.errors.SerializationException: Error >>> > > > > > deserializing >>> > > > > > > key/value for partition advice-stream-6 at offset 45153795 >>> > > > > > > java.lang.IllegalArgumentException: null >>> > > > > > > at java.nio.Buffer.limit(Buffer.java:275) >>> ~[na:1.8.0_122-ea] >>> > > > > > > at org.apache.kafka.common.utils. >>> Utils.sizeDelimited(Utils. >>> > > > java:791) >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] >>> > > > > > > at org.apache.kafka.common.record.Record.value(Record. >>> > java:268) >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] >>> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. >>> > > > > > > parseRecord(Fetcher.java:867) >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] >>> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. >>> > > > > > > parseCompletedFetch(Fetcher.java:775) >>> > ~[kafka-clients-0.10.2.0.jar: >>> > > > na] >>> > > > > > > at org.apache.kafka.clients.consumer.internals.Fetcher. >>> > > > > > > fetchedRecords(Fetcher.java:473) >>> ~[kafka-clients-0.10.2.0.jar: >>> > na] >>> > > > > > > at org.apache.kafka.clients.consumer.KafkaConsumer. >>> > > > > > > pollOnce(KafkaConsumer.java:1062) >>> ~[kafka-clients-0.10.2.0.jar: >>> > na] >>> > > > > > > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( >>> > > > > > > KafkaConsumer.java:995) >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] >>> > > > > > > at org.apache.kafka.streams.processor.internals. >>> > > > StreamThread.runLoop( >>> > > > > > > StreamThread.java:592) >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >>> > > > > > > at org.apache.kafka.streams.processor.internals. >>> > > > > > > StreamThread.run(StreamThread.java:378) >>> > ~[kafka-streams-0.10.2.1- >>> > > > > > > SNAPSHOT.jar:na] >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >> > >
