Thank you, Akhil. Actually the problem was solved last week and I did not have time to report back. The error was caused by YARN killing the container because executors use more off-heap memory that they were assigned. There was nothing in the exectuor log, but the AM log clearly states this is the problem.
After I increased the spark.yarn.executor.memoryOverhead, it was working fine. I was using Spark 1.3, which has the defaut value as executorMemory * 0.07, with minimum of 384. In spark 1.4 and later, the default value was changed to executorMemory * 0.10, with minimum of 384. Lan On Mon, Oct 12, 2015 at 8:34 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Can you look a bit deeper in the executor logs? It could be filling up the > memory and getting killed. > > Thanks > Best Regards > > On Mon, Oct 5, 2015 at 8:55 PM, Lan Jiang <ljia...@gmail.com> wrote: > >> I am still facing this issue. Executor dies due to >> >> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem >> closed >> at >> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) >> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) >> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) >> ... >> Caused by: java.io.IOException: Filesystem closed >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794) >> at >> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833) >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897) >> at java.io.DataInputStream.read(DataInputStream.java:149) >> >> Spark automatically launched new executors and the whole job completed >> fine. Anyone has a clue what's going on? >> >> The spark job reads avro files from a directory, do some basic map/filter >> and then repartition to 1, write the result to HDFS. I use spark 1.3 with >> spark-avro (1.0.0). The error only happens when running on the whole >> dataset. When running on 1/3 of the files, the same job completes without >> error. >> >> >> On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang <ljia...@gmail.com> wrote: >> >>> Hi, there >>> >>> Here is the problem I ran into when executing a Spark Job (Spark 1.3). >>> The spark job is loading a bunch of avro files using Spark SQL spark-avro >>> 1.0.0 library. Then it does some filter/map transformation, repartition to >>> 1 partition and then write to HDFS. It creates 2 stages. The total HDFS >>> block number is around 12000, thus it creates 12000 partitions, thus 12000 >>> tasks for the first stage. I have total 9 executors launched with 5 thread >>> for each. The job has run fine until the very end. When it reaches >>> 19980/20000 tasks succeeded, it suddenly failed the last 20 tasks and I >>> lost 2 executors. The spark did launched 2 new executors and finishes the >>> job eventually by reprocessing the 20 tasks. >>> >>> I only ran into this issue when I run the spark application on the full >>> dataset. When I run the 1/3 of the dataset, everything finishes fine >>> without error. >>> >>> Question 1: What is the root cause of this issue? It is simiar to >>> http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed >>> and https://issues.apache.org/jira/browse/SPARK-3052, but it says the >>> issue has been fixed since 1.2 >>> Quesiton 2: I am a little surprised that after the 2 new executors were >>> launched, replacing the two failed executors, they simply reprocessed the >>> failed 20 tasks/partitions. What about the results for other parititons >>> processed by the 2 failed executors before? I assumed the results of these >>> parititons are stored to the local disk and thus do not need to be computed >>> by the new exectuors? When are the data stored locally? Is it >>> configuration? This question is for my own understanding about the spark >>> framework. >>> >>> The exception causing the exectuor failure is below >>> >>> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem >>> closed >>> at >>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) >>> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) >>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) >>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) >>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) >>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) >>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.io.IOException: Filesystem closed >>> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794) >>> at >>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833) >>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897) >>> at java.io.DataInputStream.read(DataInputStream.java:149) >>> at org.apache.avro.mapred.FsInput.read(FsInput.java:54) >>> at >>> org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210) >>> at >>> org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839) >>> at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444) >>> at >>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:264) >>> >> >> >