Thank you, Akhil. Actually the problem was solved last week and I did not
have time to report back. The error was caused by YARN killing the
container because executors use more off-heap memory that they were
assigned. There was nothing in the exectuor log, but the AM log clearly
states this is the problem.

After I increased the spark.yarn.executor.memoryOverhead, it was working
fine. I was using Spark 1.3, which has the defaut value as executorMemory *
0.07, with minimum of 384. In spark 1.4 and later, the default value was
changed to executorMemory * 0.10, with minimum of 384.

Lan

On Mon, Oct 12, 2015 at 8:34 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Can you look a bit deeper in the executor logs? It could be filling up the
> memory and getting killed.
>
> Thanks
> Best Regards
>
> On Mon, Oct 5, 2015 at 8:55 PM, Lan Jiang <ljia...@gmail.com> wrote:
>
>> I am still facing this issue. Executor dies due to
>>
>> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem
>> closed
>> at
>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
>> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
>> ...
>> Caused by: java.io.IOException: Filesystem closed
>> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
>> at java.io.DataInputStream.read(DataInputStream.java:149)
>>
>> Spark automatically launched new executors and the whole job completed
>> fine. Anyone has a clue what's going on?
>>
>> The spark job reads avro files from a directory, do some basic map/filter
>> and then repartition to 1, write the result to HDFS. I use spark 1.3 with
>> spark-avro (1.0.0). The error only happens when running on the whole
>> dataset. When running on 1/3 of the files, the same job completes without
>> error.
>>
>>
>> On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang <ljia...@gmail.com> wrote:
>>
>>> Hi, there
>>>
>>> Here is the problem I ran into when executing a Spark Job (Spark 1.3).
>>> The spark job is loading a bunch of avro files using Spark SQL spark-avro
>>> 1.0.0 library. Then it does some filter/map transformation, repartition to
>>> 1 partition and then write to HDFS. It creates 2 stages. The total HDFS
>>> block number is around 12000, thus it creates 12000 partitions, thus 12000
>>> tasks for the first stage. I have total 9 executors launched with 5 thread
>>> for each. The job has run fine until the very end.  When it reaches
>>> 19980/20000 tasks succeeded, it suddenly failed the last 20 tasks and I
>>> lost 2 executors. The spark did launched 2 new executors and finishes the
>>> job eventually by reprocessing the 20 tasks.
>>>
>>> I only ran into this issue when I run the spark application on the full
>>> dataset. When I run the 1/3 of the dataset, everything finishes fine
>>> without error.
>>>
>>> Question 1: What is the root cause of this issue? It is simiar to
>>> http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
>>> and https://issues.apache.org/jira/browse/SPARK-3052, but it says the
>>> issue has been fixed since 1.2
>>> Quesiton 2: I am a little surprised that after the 2 new executors were
>>> launched,  replacing the two failed executors, they simply reprocessed the
>>> failed 20 tasks/partitions.  What about the results for other parititons
>>> processed by the 2 failed executors before? I assumed the results of these
>>> parititons are stored to the local disk and thus do not need to be computed
>>> by the new exectuors?  When are the data stored locally? Is it
>>> configuration? This question is for my own understanding about the spark
>>> framework.
>>>
>>> The exception causing the exectuor failure is below
>>>
>>> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem
>>> closed
>>> at
>>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
>>> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
>>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
>>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
>>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
>>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>> at
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.IOException: Filesystem closed
>>> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
>>> at
>>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
>>> at java.io.DataInputStream.read(DataInputStream.java:149)
>>> at org.apache.avro.mapred.FsInput.read(FsInput.java:54)
>>> at
>>> org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210)
>>> at
>>> org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839)
>>> at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444)
>>> at
>>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:264)
>>>
>>
>>
>

Reply via email to