[ https://issues.apache.org/jira/browse/SPARK-5250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963110#comment-14963110 ]
Mojmir Vinkler commented on SPARK-5250: --------------------------------------- Yes, it's caused by reading a corrupt file (we only experienced this for compressed (gzipped) files). I think the file got corrupted when it was saved to S3, but we used boto for that, not Spark. What's weird is that I'm able to read the file with pandas without any problems. > EOFException in when reading gzipped files from S3 with wholeTextFiles > ---------------------------------------------------------------------- > > Key: SPARK-5250 > URL: https://issues.apache.org/jira/browse/SPARK-5250 > Project: Spark > Issue Type: Sub-task > Components: Spark Core > Affects Versions: 1.2.0 > Reporter: Mojmir Vinkler > Priority: Critical > > I get an `EOFException` error when reading *some* gzipped files using > `sc.wholeTextFiles`. It happens to just a few files, I thought that the file > is corrupted, but I was able to read it without problems using `sc.textFile` > (and pandas). > Traceback for command > `sc.wholeTextFiles('s3n://s3bucket/2525322021051.csv.gz').collect()` > {code} > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-104-943aab11de03> in <module>() > ----> 1 sc.wholeTextFiles('s3n://s3bucket/2525322021051.csv.gz').collect() > /home/ubuntu/databricks/spark/python/pyspark/rdd.py in collect(self) > 674 """ > 675 with SCCallSiteSync(self.context) as css: > --> 676 bytesInJava = self._jrdd.collect().iterator() > 677 return list(self._collect_iterator_through_file(bytesInJava)) > 678 > /home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 536 answer = self.gateway_client.send_command(command) > 537 return_value = get_return_value(answer, self.gateway_client, > --> 538 self.target_id, self.name) > 539 > 540 for temp_arg in temp_args: > /home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py > in get_return_value(answer, gateway_client, target_id, name) > 298 raise Py4JJavaError( > 299 'An error occurred while calling {0}{1}{2}.\n'. > --> 300 format(target_id, '.', name), value) > 301 else: > 302 raise Py4JError( > Py4JJavaError: An error occurred while calling o1576.collect. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 41.0 (TID 4720, ip-10-0-241-126.ec2.internal): java.io.EOFException: > Unexpected end of input stream > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:137) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:77) > at java.io.InputStream.read(InputStream.java:101) > at com.google.common.io.ByteStreams.copy(ByteStreams.java:207) > at com.google.common.io.ByteStreams.toByteArray(ByteStreams.java:252) > at > org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:73) > at > org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69) > at > org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at > org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at > org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) > at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org