Not that i'm aware of On Thu, 30 Mar 2017 at 16:00 Sachin Mittal <sjmit...@gmail.com> wrote:
> Damian, > Is there any way where I can just dump out the contents at a given offset > from a given log segment file. > > I am not sure how DumpLogSegment > < > https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment > > > helps. I already know the log segment file where that message is. Thing is > size is 1 GB and there is no easy way to inspect that file and actually see > what the payload is. > > Thanks > Sachin > > > On Thu, Mar 30, 2017 at 7:18 PM, Damian Guy <damian....@gmail.com> wrote: > > > Sachin, > > > > Not sure if this will help, but you might want to try running > > https://cwiki.apache.org/confluence/display/KAFKA/ > > System+Tools#SystemTools-DumpLogSegment > > on the partition that is causing you problems. > > > > Thanks > > Damian > > > > On Thu, 30 Mar 2017 at 14:29 Michael Noll <mich...@confluent.io> wrote: > > > > > Sachin, > > > > > > there's a JIRA that seems related to what you're seeing: > > > https://issues.apache.org/jira/browse/KAFKA-4740 > > > > > > Perhaps you could check the above and report back? > > > > > > -Michael > > > > > > > > > > > > > > > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mich...@confluent.io> > > > wrote: > > > > > > > Hmm, I re-read the stacktrace again. It does look like the value-side > > > > being the culprit (as Sachin suggested earlier). > > > > > > > > -Michael > > > > > > > > > > > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mich...@confluent.io> > > > > wrote: > > > > > > > >> Sachin, > > > >> > > > >> you have this line: > > > >> > > > >> > builder.stream(Serdes.String(), serde, "advice-stream") > > > >> > > > >> Could the problem be that not the record values are causing the > > problem > > > >> -- because your value deserializer does try-catch any such errors -- > > but > > > >> that the record *keys* are malformed? The built-in > `Serdes.String()` > > > does > > > >> not try-catch deserialization errors, and from a quick look at the > > > source > > > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa > > > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your > > > >> error above ("Error deserializing key/value for partition..."), and > > the > > > >> Fetcher is swallowing the more specific SerializationException of > > > >> `String.Serdes()` (but it will include the original > > exception/Throwable > > > in > > > >> its own SerializationException). > > > >> > > > >> -Michael > > > >> > > > >> > > > >> > > > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sjmit...@gmail.com> > > > >> wrote: > > > >> > > > >>> My streams application does run in debug mode only. > > > >>> Also I have checked the code around these lines > > > >>> > > > >>> at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils. > > java:791) > > > >>> ~[kafka-clients-0.10.2.0.jar:na] > > > >>> at org.apache.kafka.common.record.Record.value(Record.java:268) > > > >>> ~[kafka-clients-0.10.2.0.jar:na] > > > >>> at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec > > > >>> ord(Fetcher.java:867) > > > >>> ~[kafka-clients-0.10.2.0.jar:na] > > > >>> > > > >>> I don't see any log statement which will give me more information. > > > >>> > > > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main > > > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867 > > > >>> > > > >>> The issue is happening at this line and perhaps handling the > > exception > > > >>> and > > > >>> setting the value to be null may be better options. > > > >>> Yes at client side nothing can be done because exception is > happening > > > >>> before this.valueDeserializer.deserialize can be called. > > > >>> > > > >>> Thanks > > > >>> Sachin > > > >>> > > > >>> > > > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <damian....@gmail.com> > > > >>> wrote: > > > >>> > > > >>> > The suggestions in that FAQ won't help as it is too late, i.e., > the > > > >>> message > > > >>> > has already been received into Streams. > > > >>> > You could create a simple app that uses the Consumer, seeks to > the > > > >>> offset, > > > >>> > and tries to read the message. If you did this in debug mode you > > > might > > > >>> find > > > >>> > out some more information. > > > >>> > > > > >>> > > > > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sjmit...@gmail.com> > > > wrote: > > > >>> > > > > >>> > > Well I try to read that offset via kafka-console-consumer.sh > too > > > and > > > >>> it > > > >>> > > fails with same error. > > > >>> > > > > > >>> > > So was wondering if I can apply any of the suggestion as per > > > >>> > > > > > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling- > > > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages > > > >>> > > > > > >>> > > If there is any other was just to get the contents of that > > message > > > it > > > >>> > would > > > >>> > > be helpful. > > > >>> > > > > > >>> > > Thanks > > > >>> > > Sachin > > > >>> > > > > > >>> > > > > > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy < > > damian....@gmail.com> > > > >>> > wrote: > > > >>> > > > > > >>> > > > Hi Sachin, > > > >>> > > > > > > >>> > > > Have you tried firing up a consumer (non-streams), seeking to > > > that > > > >>> > offset > > > >>> > > > on the topic and seeing what the message is? Might be easier > to > > > >>> debug? > > > >>> > > Like > > > >>> > > > you say, it is failing in the consumer. > > > >>> > > > Thanks, > > > >>> > > > Damian > > > >>> > > > > > > >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal < > sjmit...@gmail.com > > > > > > >>> wrote: > > > >>> > > > > > > >>> > > > > I think I am following the third option. > > > >>> > > > > > > > >>> > > > > My pipeline is: > > > >>> > > > > > > > >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new > VDeserializer > > > ()); > > > >>> > > > > > > > >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream") > > > >>> > > > > .filter(new Predicate<String, V>() { ...}) > > > >>> > > > > .groupByKey() > > > >>> > > > > .aggregate(new Initializer<V1>() {...}, new > > > Aggregator<String, > > > >>> V, > > > >>> > > V1>() > > > >>> > > > > {...}, windows, supplier) > > > >>> > > > > .mapValues(new ValueMapper<V1, V2>() { ... }) > > > >>> > > > > .foreach(new ForeachAction<Windowed<String>, V2>() {... > > }); > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > and In VDeserializer (implements Deserializer<V>) I am > doing > > > >>> > something > > > >>> > > > like > > > >>> > > > > this: > > > >>> > > > > > > > >>> > > > > public V deserialize(String paramString, byte[] > > > >>> > paramArrayOfByte) { > > > >>> > > > > if (paramArrayOfByte == null) { return null;} > > > >>> > > > > V data = null; > > > >>> > > > > try { > > > >>> > > > > data = objectMapper.readValue(paramArrayOfByte, > > new > > > >>> > > > > TypeReference<V>() {}); > > > >>> > > > > } catch (Exception e) { > > > >>> > > > > e.printStackTrace(); > > > >>> > > > > } > > > >>> > > > > return data; > > > >>> > > > > } > > > >>> > > > > > > > >>> > > > > So I am catching any exception that may happen when > > > >>> deserializing the > > > >>> > > > data. > > > >>> > > > > > > > >>> > > > > This is what third option suggest (if I am not mistaken). > > > >>> > > > > > > > >>> > > > > Please let me know given the pipeline we which option would > > be > > > >>> best > > > >>> > and > > > >>> > > > how > > > >>> > > > > can we incorporate that in our pipeline. > > > >>> > > > > > > > >>> > > > > Also not exception is happening when reading from source > > topic > > > >>> which > > > >>> > is > > > >>> > > > > "advice-stream", so looks like flow is not going to > pipeline > > at > > > >>> all > > > >>> > for > > > >>> > > > us > > > >>> > > > > to handle. It is terminating right at consumer poll. > > > >>> > > > > > > > >>> > > > > Thanks > > > >>> > > > > Sachin > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll < > > > >>> mich...@confluent.io> > > > >>> > > > > wrote: > > > >>> > > > > > > > >>> > > > > > Could this be a corrupted message ("poison pill") in your > > > >>> topic? > > > >>> > > > > > > > > >>> > > > > > If so, take a look at > > > >>> > > > > > http://docs.confluent.io/current/streams/faq.html# > > > >>> > > > > > > > > >>> > > > > handling-corrupted-records-and-deserialization-errors- > > > >>> > > > poison-pill-messages > > > >>> > > > > > > > > >>> > > > > > FYI: We're currently investigating a more elegant way to > > > >>> address > > > >>> > such > > > >>> > > > > > poison pill problems. If you have feedback on that > front, > > > feel > > > >>> > free > > > >>> > > to > > > >>> > > > > > share it with us. :-) > > > >>> > > > > > > > > >>> > > > > > -Michael > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal < > > > >>> > sjmit...@gmail.com> > > > >>> > > > > > wrote: > > > >>> > > > > > > > > >>> > > > > > > Hi, > > > >>> > > > > > > This is for first time we are getting a weird > exception. > > > >>> > > > > > > After this the streams caches. > > > >>> > > > > > > > > > >>> > > > > > > Only work around is to manually seek and commit offset > > to a > > > >>> > greater > > > >>> > > > > > number > > > >>> > > > > > > and we are needing this manual intervention again and > > > again. > > > >>> > > > > > > > > > >>> > > > > > > Any idea what is causing it and how can we circumvent > > this. > > > >>> > > > > > > > > > >>> > > > > > > Note this error happens in both cases when 10.2 client > or > > > >>> 10.1.1 > > > >>> > > > client > > > >>> > > > > > > connect to kafka server 10.1.1 > > > >>> > > > > > > > > > >>> > > > > > > So this does not looks like version issue. > > > >>> > > > > > > > > > >>> > > > > > > Also we have following setting > > > >>> > > > > > > message.max.bytes=5000013 > > > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, > > "5048576" > > > >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576" > > > >>> > > > > > > > > > >>> > > > > > > Rest is all default and also increasing the value for > > > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not > > > >>> help. > > > >>> > > > > > > > > > >>> > > > > > > Stack trace below. > > > >>> > > > > > > > > > >>> > > > > > > Thanks > > > >>> > > > > > > Sachin > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > org.apache.kafka.common.errors.SerializationException: > > > Error > > > >>> > > > > > deserializing > > > >>> > > > > > > key/value for partition advice-stream-6 at offset > > 45153795 > > > >>> > > > > > > java.lang.IllegalArgumentException: null > > > >>> > > > > > > at java.nio.Buffer.limit(Buffer.java:275) > > > >>> ~[na:1.8.0_122-ea] > > > >>> > > > > > > at org.apache.kafka.common.utils. > > > >>> Utils.sizeDelimited(Utils. > > > >>> > > > java:791) > > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > >>> > > > > > > at > org.apache.kafka.common.record.Record.value(Record. > > > >>> > java:268) > > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > >>> > > > > > > at org.apache.kafka.clients. > > consumer.internals.Fetcher. > > > >>> > > > > > > parseRecord(Fetcher.java:867) > > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > >>> > > > > > > at org.apache.kafka.clients. > > consumer.internals.Fetcher. > > > >>> > > > > > > parseCompletedFetch(Fetcher.java:775) > > > >>> > ~[kafka-clients-0.10.2.0.jar: > > > >>> > > > na] > > > >>> > > > > > > at org.apache.kafka.clients. > > consumer.internals.Fetcher. > > > >>> > > > > > > fetchedRecords(Fetcher.java:473) > > > >>> ~[kafka-clients-0.10.2.0.jar: > > > >>> > na] > > > >>> > > > > > > at org.apache.kafka.clients.consumer.KafkaConsumer. > > > >>> > > > > > > pollOnce(KafkaConsumer.java:1062) > > > >>> ~[kafka-clients-0.10.2.0.jar: > > > >>> > na] > > > >>> > > > > > > at org.apache.kafka.clients. > > consumer.KafkaConsumer.poll( > > > >>> > > > > > > KafkaConsumer.java:995) > > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na] > > > >>> > > > > > > at org.apache.kafka.streams.processor.internals. > > > >>> > > > StreamThread.runLoop( > > > >>> > > > > > > StreamThread.java:592) > > > >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > >>> > > > > > > at org.apache.kafka.streams.processor.internals. > > > >>> > > > > > > StreamThread.run(StreamThread.java:378) > > > >>> > ~[kafka-streams-0.10.2.1- > > > >>> > > > > > > SNAPSHOT.jar:na] > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >> > > > >> > > > >> > > > > > > > > > > > > > >