Could you provide your Spark version please? On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <nipunarora2...@gmail.com> wrote:
> Hi, > > I get a resource leak, where the number of file descriptors in spark > streaming keeps increasing. We end up with a "too many file open" error > eventually through an exception caused in: > > JAVARDDKafkaWriter, which is writing a spark JavaDStream<String> > > The exception is attached inline. Any help will be greatly appreciated. > > Thanks > Nipun > > ------------------------------------------- > Time: 1485762530000 ms > ------------------------------------------- > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent > failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost): > java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4- > 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084 > (too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.<init>(FileOutputStream.java:221) > at org.apache.spark.storage.DiskBlockObjectWriter.open( > DiskBlockObjectWriter.scala:88) > at org.apache.spark.storage.DiskBlockObjectWriter.write( > DiskBlockObjectWriter.scala:181) > at org.apache.spark.util.collection.WritablePartitionedPairCollect > ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) > at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile( > ExternalSorter.scala:659) > at org.apache.spark.shuffle.sort.SortShuffleWriter.write( > SortShuffleWriter.scala:72) > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:73) > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1145) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ > scheduler$DAGScheduler$$failJobAndIndependentStages( > DAGScheduler.scala:1431) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1419) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1418) > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at org.apache.spark.scheduler.DAGScheduler.abortStage( > DAGScheduler.scala:1418) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at scala.Option.foreach(Option.scala:236) > at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( > DAGScheduler.scala:799) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > doOnReceive(DAGScheduler.scala:1640) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1599) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954) > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1. > apply(RDD.scala:920) > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1. > apply(RDD.scala:918) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:150) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) > at org.apache.spark.api.java.JavaRDDLike$class. > foreachPartition(JavaRDDLike.scala:225) > at org.apache.spark.api.java.AbstractJavaRDDLike. > foreachPartition(JavaRDDLike.scala:46) > at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call( > JavaRDDStringKafkaWriter.java:25) > at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call( > JavaRDDStringKafkaWriter.java:10) > at org.apache.spark.streaming.api.java.JavaDStreamLike$$ > anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) > at org.apache.spark.streaming.api.java.JavaDStreamLike$$ > anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) > at org.apache.spark.streaming.dstream.DStream$$anonfun$ > foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at org.apache.spark.streaming.dstream.DStream$$anonfun$ > foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at org.apache.spark.streaming.dstream.ForEachDStream$$ > anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) > at org.apache.spark.streaming.dstream.ForEachDStream$$ > anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at org.apache.spark.streaming.dstream.ForEachDStream$$ > anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at org.apache.spark.streaming.dstream.DStream. > createRDDWithLocalProperties(DStream.scala:426) > at org.apache.spark.streaming.dstream.ForEachDStream$$ > anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) > at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply( > ForEachDStream.scala:49) > at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply( > ForEachDStream.scala:49) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at org.apache.spark.streaming.scheduler.JobScheduler$ > JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229) > at org.apache.spark.streaming.scheduler.JobScheduler$ > JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229) > at org.apache.spark.streaming.scheduler.JobScheduler$ > JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at org.apache.spark.streaming.scheduler.JobScheduler$ > JobHandler.run(JobScheduler.scala:228) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1145) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4- > 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084 > (too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.<init>(FileOutputStream.java:221) > at org.apache.spark.storage.DiskBlockObjectWriter.open( > DiskBlockObjectWriter.scala:88) > at org.apache.spark.storage.DiskBlockObjectWriter.write( > DiskBlockObjectWriter.scala:181) > at org.apache.spark.util.collection.WritablePartitionedPairCollect > ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) > at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile( > ExternalSorter.scala:659) > at org.apache.spark.shuffle.sort.SortShuffleWriter.write( > SortShuffleWriter.scala:72) > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:73) > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > ... 3 more > >