Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-29 Thread Nicolas Phung
Hello, I'm using 4Gb for the driver memory. The checkpoint is between 1 Gb and 10 Gb depending if I'm reprocessing all the data from beginning or just getting the latest offset from the real time processed. Is there any best practice to be aware of with driver memory relating to checkpoint size ?

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-28 Thread Cody Koeninger
That stacktrace looks like an out of heap space on the driver while writing checkpoint, not on the worker nodes. How much memory are you giving the driver? How big are your stored checkpoints? On Tue, Jul 28, 2015 at 9:30 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi, After using

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-24 Thread Nicolas Phung
Hello, I manage to read all my data back with skipping offset that contains a corrupt message. I have one more question regarding messageHandler method vs dstream.foreachRDD.map vs dstream.map.foreachRDD best practices. I'm using a function to read the serialized message from kafka and convert it

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-24 Thread Cody Koeninger
It's really a question of whether you need access to the MessageAndMetadata, or just the key / value from the message. If you just need the key/value, dstream map is fine. In your case, since you need to be able to control a possible failure when deserializing the message from the

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-21 Thread Cody Koeninger
Yeah, I'm referring to that api. If you want to filter messages in addition to catching that exception, have your mesageHandler return an option, so the type R would end up being Option[WhateverYourClassIs], then filter out None before doing the rest of your processing. If you aren't already

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-21 Thread Nicolas Phung
Hi Cody, Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand how to use the messageHandler parameter/function in the createDirectStream method. You are referring to this, aren't you ? def createDirectStream[ K: ClassTag, V: ClassTag, KD : Decoder[K]: ClassTag , VD :

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Nicolas Phung
Hi Cody, Thanks for you help. It seems there's something wrong with some messages within my Kafka topics then. I don't understand how, I can get bigger or incomplete message since I use default configuration to accept only 1Mb message in my Kafka topic. If you have any others informations or

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Cody Koeninger
I'd try logging the offsets for each message, see where problems start, then try using the console consumer starting at those offsets and see if you can reproduce the problem. On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi Cody, Thanks for you help. It seems

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Cody Koeninger
Yeah, in the function you supply for the messageHandler parameter to createDirectStream, catch the exception and do whatever makes sense for your application. On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, Using the old Spark Streaming Kafka API, I got

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Nicolas Phung
Hello, Using the old Spark Streaming Kafka API, I got the following around the same offset: kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3561357254, computed crc = 171652633) at kafka.message.Message.ensureValid(Message.scala:166) at

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-16 Thread Cody Koeninger
Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned

Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-16 Thread Cody Koeninger
Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown

Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-16 Thread Nicolas Phung
Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in