Hello, I am using Spark1.2 and Cassandra 2.0.12. And the table I am about to read is with 20 million rows
When I use 10 threads of spark to read from cassandra, then it works fine. val sc = new SparkContext("local[10]", "tungsten", conf) When I use 40 threads of spark to read from cassandra, then it crashed with following error. val sc = new SparkContext("local[40]", "tungsten", conf) Then I changed the concurrent_read parameter in cassandra.yaml from 32 to 320. In this case, it works when i start 40 threads of spark. Then I tried to increase threads to 80, then the exception happens again. It seems like a configuration problem. But how can I get through it. Thanks 15/03/23 20:25:43 WARN TaskSetManager: Lost task 222.0 in stage 0.0 (TID 222, localhost): TaskKilled (killed intentionally) 15/03/23 20:25:43 WARN TaskSetManager: Lost task 240.0 in stage 0.0 (TID 240, localhost): TaskKilled (killed intentionally) 15/03/23 20:25:43 WARN TaskSetManager: Lost task 241.0 in stage 0.0 (TID 241, localhost): TaskKilled (killed intentionally) 15/03/23 20:25:43 WARN TaskSetManager: Lost task 228.0 in stage 0.0 (TID 228, localhost): TaskKilled (killed intentionally) org.apache.spark.SparkException: Job aborted due to stage failure: Task 157 in stage 0.0 failed 1 times, most recent failure: Lost task 157.0 in stage 0.0 (TID 157, l ocalhost): java.io.IOException: Exception during execution of SELECT count(*) FROM "tungsten"."cudb_dn" WHERE token("entry_key") > ? AND token("entry_key") <= ? ALL OW FILTERING: All host(s) tried for query failed (tried: /169.254.100.4:9042 (com.datastax.driver.core.exceptions.DriverException: Timed out waiting for server respon se), /169.254.100.3:9042 (com.datastax.driver.core.TransportException: [/169.254.100.3:9042] Connection has been closed)) at com.datastax.spark.connector.rdd.CassandraRDD.com $datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:433) at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$20.apply(CassandraRDD.scala:447) at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$20.apply(CassandraRDD.scala:447) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10) at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:868) at org.apache.spark.SparkContext$$anonfun$30.apply(SparkContext.scala:1389) at org.apache.spark.SparkContext$$anonfun$30.apply(SparkContext.scala:1389) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /169.254.100.4:9042 (com.datastax.driver.core.exce ptions.DriverException: Timed out waiting for server response), /169.254.100.3:9042 (com.datastax.driver.core.TransportException: [/169.254.100.3:9042] Connection has been closed)) at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) at com.sun.proxy.$Proxy11.execute(Unknown Source) at com.datastax.spark.connector.rdd.CassandraRDD.com $datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:424) ... 14 more Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /169.254.100.4:9042 (com.datastax.driver.core.exce ptions.DriverException: Timed out waiting for server response), /169.254.100.3:9042 (com.datastax.driver.core.TransportException: [/169.254.100.3:9042] Connection has been closed)) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107) at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:210) ... 3 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)