Thanks for the suggestion. I ran the command and the limit is 1024.

Based on my understanding, the connector to Kafka should not open so many
files. Do you think there is possible socket leakage? BTW, in every batch
which is 5 seconds, I output some results to mysql:

  def ingestToMysql(data: Array[String]) {
    val url = "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
    var sql = "insert into loggingserver1 values "
    data.foreach(line => sql += line)
    sql = sql.dropRight(1)
    sql += ";"
    logger.info(sql)
    var conn: java.sql.Connection = null
    try {
      conn = DriverManager.getConnection(url)
      val statement = conn.createStatement()
      statement.executeUpdate(sql)
    } catch {
      case e: Exception => logger.error(e.getMessage())
    } finally {
      if (conn != null) {
        conn.close
      }
    }
  }

I am not sure whether the leakage originates from Kafka connector or the
sql connections.

Bill

On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you run the command 'ulimit -n' to see the current limit ?
>
> To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
> Cheers
>
> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay <bill.jaypeter...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am using the direct approach to receive real-time data from Kafka in
>> the following link:
>>
>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>
>>
>> My code follows the word count direct example:
>>
>>
>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>>
>>
>>
>> After around 12 hours, I got the following error messages in Spark log:
>>
>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
>> 1430338690000 ms
>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
>> many open files, java.io.IOException: Too many open files,
>> java.io.IOException: Too many open files, java.io.IOException: Too many
>> open files, java.io.IOException: Too many open files)
>>         at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>         at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>         at scala.Option.orElse(Option.scala:257)
>>         at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>         at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>         at scala.Option.orElse(Option.scala:257)
>>         at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>         at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>         at scala.Option.orElse(Option.scala:257)
>>         at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>         at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>         at scala.Option.orElse(Option.scala:257)
>>         at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>         at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>         at scala.Option.orElse(Option.scala:257)
>>         at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>         at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>         at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:239)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:237)
>>         at scala.util.Try$.apply(Try.scala:161)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:237)
>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:174)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:85)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:83)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>         at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Thanks for the help.
>>
>> Bill
>>
>
>

Reply via email to