Hello Andy,

This is a problem we have seen in using the CQL Java driver under heavy
ready loads where it is using NIO and is waiting on many pending responses
which causes to many open sockets and hence too many open files. Are you by
any chance using async queries?

I am the maintainer of Calliope... Feel free to mail me directly on any
issues/queries you have working with Calliope, will be glad to assist.

Cheers,
Rohit


*Founder & CEO, **Tuplejump, Inc.*
____________________________
www.tuplejump.com
*The Data Engineering Platform*


On Fri, Feb 21, 2014 at 3:34 PM, andy petrella <andy.petre...@gmail.com>wrote:

> MMMmmmh good point !!
>
> Before answering, I tried to use callioppe but I got an issue and since
> the iteration review was near I quickly switched to the datastax driver.
> But I'll get to callioppe soon, with some questions maybe ;-).
>
> Regarding your point (very good one, I've to say), actually I'm creating a
> session and a batch per partitions.
> Now the shamy part... I haven't set any options for the pool :-/. Is there
> some tuning clues? In my case the C* is local (docker image) so maybe
> should i do
> builder.poolingOptions().setMaxConnectionsPerHost(LOCAL, BIGNUMBER)?
>
> The point is, what about this BIGNUMBER... can it be really big? (Sounds
> weird to me, but I don't want to pre-filter options based on feelings).
>
> Thanks for your response
>
> andy
>
> On Fri, Feb 21, 2014 at 10:36 AM, Sourav Chandra <
> sourav.chan...@livestream.com> wrote:
>
>> From stacktrace it looks like you are using datstax cassandra driver and
>> it tried to create cluster.
>>
>> How many connections you are creating in poolingOptions()  i.e. builder.
>> poolingOptions().setMaxConnectionsPerHost(...)
>>
>> Are you creating this per rdd? Might be there are lots of connections
>> created and at last it failed to create any more.
>>
>> Thanks,
>> Sourav
>>
>>
>> On Fri, Feb 21, 2014 at 3:02 PM, andy petrella 
>> <andy.petre...@gmail.com>wrote:
>>
>>> Hey guyz,
>>>
>>> I've got this issue (see bottom) with Spark, deployed in Standalone mode
>>> on a local docker environment.
>>> I know that I need to raise the ulimit (only 1024 now) but in the
>>> meantime I was just wondering how this could happen.
>>> My gut feeling is because I'm mounting a lot in memory and Spark tries
>>> to dump some RDDs on the FS, and then boom.
>>>
>>> Also, I was wondering if it cannot be a clue that my job is maybe to
>>> eager in memory? How is it something quite normal which such a low ulimit
>>> on workers?
>>>
>>> Thanks a lot (in advance ^^)
>>>
>>> Cheers,
>>> andy
>>>
>>>
>>>
>>> 14/02/21 08:32:15 ERROR Executor: Exception in task ID 472
>>> org.jboss.netty.channel.ChannelException: Failed to create a selector.
>>>  at
>>> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:337)
>>> at
>>> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:95)
>>>  at
>>> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:53)
>>> at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
>>>  at
>>> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
>>> at
>>> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
>>>  at
>>> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:99)
>>> at
>>> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:69)
>>>  at
>>> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
>>> at
>>> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:33)
>>>  at
>>> org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:151)
>>> at
>>> org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:116)
>>>  at
>>> com.datastax.driver.core.Connection$Factory.<init>(Connection.java:349)
>>> at
>>> com.datastax.driver.core.Connection$Factory.<init>(Connection.java:360)
>>>  at com.datastax.driver.core.Cluster$Manager.<init>(Cluster.java:857)
>>> at com.datastax.driver.core.Cluster$Manager.<init>(Cluster.java:806)
>>>  at com.datastax.driver.core.Cluster.<init>(Cluster.java:76)
>>> at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:132)
>>>  at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:771)
>>> at
>>> com.virdata.core.batch.sample.Timeseries$$anonfun$storeInCassandra$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(Timeseries.scala:45)
>>>  at
>>> com.virdata.core.batch.sample.Timeseries$$anonfun$storeInCassandra$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(Timeseries.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:595)
>>>  at
>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:595)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
>>>  at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>>>  at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>>  at
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>> at
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45)
>>>  at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>>  at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>> at
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45)
>>>  at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>> at java.lang.Thread.run(Thread.java:722)
>>> Caused by: java.io.IOException: Too many open files
>>> at sun.nio.ch.IOUtil.makePipe(Native Method)
>>> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
>>>  at
>>> sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
>>> at java.nio.channels.Selector.open(Selector.java:227)
>>>  at
>>> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:335)
>>> ... 37 more
>>> 14/02/21 08:32:53 WARN BlockManagerMaster: Error sending message to
>>> BlockManagerMaster in 1 attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>> seconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>  at
>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>  at
>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>> at scala.concurrent.Await$.result(package.scala:107)
>>>  at
>>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:162)
>>> at
>>> org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52)
>>>  at org.apache.spark.storage.BlockManager.org
>>> $apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97)
>>>  at
>>> org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135)
>>> at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
>>>  at
>>> akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>> at java.lang.Thread.run(Thread.java:722)
>>> 14/02/21 08:33:26 WARN BlockManagerMaster: Error sending message to
>>> BlockManagerMaster in 2 attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>> seconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>  at
>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>  at
>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>> at scala.concurrent.Await$.result(package.scala:107)
>>>  at
>>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:162)
>>> at
>>> org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52)
>>>  at org.apache.spark.storage.BlockManager.org
>>> $apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97)
>>>  at
>>> org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135)
>>> at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
>>>  at
>>> akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>> at java.lang.Thread.run(Thread.java:722)
>>> 14/02/21 08:33:59 WARN BlockManagerMaster: Error sending message to
>>> BlockManagerMaster in 3 attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>> seconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>  at
>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>  at
>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>> at scala.concurrent.Await$.result(package.scala:107)
>>>  at
>>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:162)
>>> at
>>> org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52)
>>>  at org.apache.spark.storage.BlockManager.org
>>> $apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97)
>>>  at
>>> org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135)
>>> at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
>>>  at
>>> akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>> at java.lang.Thread.run(Thread.java:722)
>>> 14/02/21 08:34:03 ERROR Executor: Uncaught exception in thread
>>> Thread[Connection manager future execution context-0,5,main]
>>> java.lang.Error: org.apache.spark.SparkException: Error sending message
>>> to BlockManagerMaster [message = HeartBeat(BlockManagerId(1, 172.17.0.4,
>>> 52780, 0))]
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1116)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>  at java.lang.Thread.run(Thread.java:722)
>>> Caused by: org.apache.spark.SparkException: Error sending message to
>>> BlockManagerMaster [message = HeartBeat(BlockManagerId(1, 172.17.0.4,
>>> 52780, 0))]
>>>  at
>>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:176)
>>> at
>>> org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52)
>>>  at org.apache.spark.storage.BlockManager.org
>>> $apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97)
>>>  at
>>> org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135)
>>> at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
>>>  at
>>> akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>  ... 2 more
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>> after [30 seconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>  at
>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>  at
>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>> at scala.concurrent.Await$.result(package.scala:107)
>>>  at
>>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:162)
>>> ... 8 more
>>>
>>
>>
>>
>> --
>>
>> Sourav Chandra
>>
>> Senior Software Engineer
>>
>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>
>> sourav.chan...@livestream.com
>>
>> o: +91 80 4121 8723
>>
>> m: +91 988 699 3746
>>
>> skype: sourav.chandra
>>
>> Livestream
>>
>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>> Block, Koramangala Industrial Area,
>>
>> Bangalore 560034
>>
>> www.livestream.com
>>
>
>

Reply via email to