Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql:
def ingestToMysql(data: Array[String]) { val url = "jdbc:mysql://localhost:3306/realtime?user=root&password=123" var sql = "insert into loggingserver1 values " data.foreach(line => sql += line) sql = sql.dropRight(1) sql += ";" logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception => logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Can you run the command 'ulimit -n' to see the current limit ? > > To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* > Cheers > > On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay <bill.jaypeter...@gmail.com> > wrote: > >> Hi all, >> >> I am using the direct approach to receive real-time data from Kafka in >> the following link: >> >> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >> >> >> My code follows the word count direct example: >> >> >> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala >> >> >> >> After around 12 hours, I got the following error messages in Spark log: >> >> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time >> 1430338690000 ms >> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too >> many open files, java.io.IOException: Too many open files, >> java.io.IOException: Too many open files, java.io.IOException: Too many >> open files, java.io.IOException: Too many open files) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >> at >> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >> at >> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >> at >> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >> at >> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >> at >> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >> at >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >> at >> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:239) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:237) >> at scala.util.Try$.apply(Try.scala:161) >> at >> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:237) >> at org.apache.spark.streaming.scheduler.JobGenerator.org >> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:174) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:85) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:83) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> Thanks for the help. >> >> Bill >> > >