Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196
On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger <c...@koeninger.org> wrote: > Well, working backwards down the stack trace... > > at java.nio.Buffer.limit(Buffer.java:275) > > That exception gets thrown if the limit is negative or greater than the > buffer's capacity > > > at kafka.message.Message.sliceDelimited(Message.scala:236) > > If size had been negative, it would have just returned null, so we know > the exception got thrown because the size was greater than the buffer's > capacity > > > I haven't seen that before... maybe a corrupted message of some kind? > > If that problem is reproducible, try providing an explicit argument for > messageHandler, with a function that logs the message offset. > > > On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <nicolas.ph...@gmail.com> > wrote: > >> Hello, >> >> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark >> Streaming Kafka method createDirectStream, everything is fine till a driver >> error happened (driver is killed, connection lost...). When the driver pops >> up again, it resumes the processing with the checkpoint in HDFS. Except, I >> got this: >> >> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; >> aborting job >> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job >> 1437032118000 ms.0 >> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in >> stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 >> (TID 16, slave05.local): java.lang.IllegalArgumentException >> at java.nio.Buffer.limit(Buffer.java:275) >> at kafka.message.Message.sliceDelimited(Message.scala:236) >> at kafka.message.Message.payload(Message.scala:218) >> at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) >> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at >> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) >> at >> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >> at >> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) >> at >> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) >> at >> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >> at >> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> This is happening only when I'm doing a full data processing from Kafka. >> If there's no load, when you killed the driver and then restart, it resumes >> the checkpoint as expected without missing data. Did someone encounters >> something similar ? How did you solve this ? >> >> Regards, >> >> Nicolas PHUNG >> > >