Hi,
I am stuck at this for 3 days now. I am using the spark-cassandra-connector
with spark and I am able to make RDDs with sc.cassandraTable function that
means spark is able to communicate with Cassandra properly.
But somehow the saveToCassandra is not working. Below are the steps I am doing.
Does it have something to do with my spark-env or spark-defaults? Am I missing
something critical ?
scala> import com.datastax.spark.connector._
scala>
sc.addJar("/home/analytics/Installers/spark-1.1.1/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar")
scala> val myTable = sc.cassandraTable("test2", " words")
scala> myTable.collect()
--- this works perfectly fine.
scala> val data = sc.parallelize(Seq((81, "XXX"), (82, "YYYY")))
scala> data.saveToCassandra("test2", "words", SomeColumns("word", "count"))
--- this fails
15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.192:9042 added
15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host
10.131.141.192 (datacenter1)
15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.193:9042 added
15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host
10.131.141.193 (datacenter1)
15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.200:9042 added
15/03/11 15:16:45 INFO CassandraConnector: Connected to Cassandra cluster:
wfan_cluster_DB
15/03/11 15:16:45 INFO SparkContext: Starting job: runJob at
RDDFunctions.scala:29
15/03/11 15:16:45 INFO DAGScheduler: Got job 1 (runJob at
RDDFunctions.scala:29) with 2 output partitions (allowLocal=false)
15/03/11 15:16:45 INFO DAGScheduler: Final stage: Stage 1(runJob at
RDDFunctions.scala:29)
15/03/11 15:16:45 INFO DAGScheduler: Parents of final stage: List()
15/03/11 15:16:45 INFO DAGScheduler: Missing parents: List()
15/03/11 15:16:45 INFO DAGScheduler: Submitting Stage 1
(ParallelCollectionRDD[1] at parallelize at <console>:20), which has no missing
parents
15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(7400) called with
curMem=1792, maxMem=2778778828
15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 7.2 KB, free 2.6 GB)
15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(3602) called with
curMem=9192, maxMem=2778778828
15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in
memory (estimated size 3.5 KB, free 2.6 GB)
15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
10.131.141.200:56502 (size: 3.5 KB, free: 2.6 GB)
15/03/11 15:16:45 INFO BlockManagerMaster: Updated info of block
broadcast_1_piece0
15/03/11 15:16:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1
(ParallelCollectionRDD[1] at parallelize at <console>:20)
15/03/11 15:16:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/03/11 15:16:45 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2,
10.131.141.192, PROCESS_LOCAL, 1216 bytes)
15/03/11 15:16:45 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3,
10.131.141.193, PROCESS_LOCAL, 1217 bytes)
15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
10.131.141.193:51660 (size: 3.5 KB, free: 267.3 MB)
15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
10.131.141.192:32875 (size: 3.5 KB, free: 267.3 MB)
15/03/11 15:16:45 INFO CassandraConnector: Disconnected from Cassandra cluster:
wfan_cluster_DB
15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2,
10.131.141.192): java.lang.NoSuchMethodError:
org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;
com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70)
com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)
com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
15/03/11 15:16:46 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 4,
10.131.141.192, PROCESS_LOCAL, 1216 bytes)
15/03/11 15:16:46 INFO ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@29ffe58e
15/03/11 15:16:46 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@29ffe58e
java.nio.channels.CancelledKeyException
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Executor 1 disconnected, so
removing it
15/03/11 15:16:46 INFO ConnectionManager: Handling connection error on
connection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 ERROR TaskSchedulerImpl: Lost executor 1 on 10.131.141.192:
remote Akka client disassociated
15/03/11 15:16:46 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO TaskSetManager: Re-queueing tasks for 1 from TaskSet 1.0
15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/1 is now EXITED (Command exited with code 50)
15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Executor
app-20150311151059-0008/1 removed: Command exited with code 50
15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.1 in stage 1.0 (TID 4,
10.131.141.192): ExecutorLostFailure (executor lost)
15/03/11 15:16:46 INFO DAGScheduler: Executor lost: 1 (epoch 0)
15/03/11 15:16:46 INFO AppClient$ClientActor: Executor added:
app-20150311151059-0008/2 on worker-20150310181709-10.131.141.192-58499
(10.131.141.192:58499) with 1 cores
15/03/11 15:16:46 INFO BlockManagerMasterActor: Trying to remove executor 1
from BlockManagerMaster.
15/03/11 15:16:46 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(1, 10.131.141.192, 32875, 0)
15/03/11 15:16:46 INFO BlockManagerMaster: Removed 1 successfully in
removeExecutor
15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20150311151059-0008/2 on hostPort 10.131.141.192:58499 with 1 cores, 512.0
MB RAM
15/03/11 15:16:46 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/2 is now LOADING
15/03/11 15:16:46 INFO TaskSetManager: Starting task 0.2 in stage 1.0 (TID 5,
10.131.141.193, PROCESS_LOCAL, 1216 bytes)
15/03/11 15:16:46 INFO TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3) on
executor 10.131.141.193: java.lang.NoSuchMethodError
(org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;)
[duplicate 1]
15/03/11 15:16:47 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660)
15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Executor 0 disconnected, so
removing it
15/03/11 15:16:47 ERROR TaskSchedulerImpl: Lost executor 0 on 10.131.141.193:
remote Akka client disassociated
15/03/11 15:16:47 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660)
15/03/11 15:16:47 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 1.0
15/03/11 15:16:47 WARN TaskSetManager: Lost task 0.2 in stage 1.0 (TID 5,
10.131.141.193): ExecutorLostFailure (executor lost)
15/03/11 15:16:47 ERROR ConnectionManager: Corresponding SendingConnection to
ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660) not found
15/03/11 15:16:47 INFO DAGScheduler: Executor lost: 0 (epoch 1)
15/03/11 15:16:47 INFO BlockManagerMasterActor: Trying to remove executor 0
from BlockManagerMaster.
15/03/11 15:16:47 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(0, 10.131.141.193, 51660, 0)
15/03/11 15:16:47 INFO BlockManagerMaster: Removed 0 successfully in
removeExecutor
15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/0 is now EXITED (Command exited with code 50)
15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Executor
app-20150311151059-0008/0 removed: Command exited with code 50
15/03/11 15:16:47 INFO AppClient$ClientActor: Executor added:
app-20150311151059-0008/3 on worker-20150310181710-10.131.141.193-35791
(10.131.141.193:35791) with 1 cores
15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20150311151059-0008/3 on hostPort 10.131.141.193:35791 with 1 cores, 512.0
MB RAM
15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/3 is now LOADING
15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/2 is now RUNNING
15/03/11 15:16:49 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/3 is now RUNNING
15/03/11 15:16:51 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://[email protected]:37230/user/Executor#544826218]
with ID 2
15/03/11 15:16:51 INFO TaskSetManager: Starting task 0.3 in stage 1.0 (TID 6,
10.131.141.192, PROCESS_LOCAL, 1216 bytes)
15/03/11 15:16:52 INFO BlockManagerMasterActor: Registering block manager
10.131.141.192:40195 with 267.3 MB RAM, BlockManagerId(2, 10.131.141.192,
40195, 0)
15/03/11 15:16:52 INFO ConnectionManager: Accepted connection from
[kvs-in-wfanub03.int.kronos.com/10.131.141.192:59385]
15/03/11 15:16:52 INFO SendingConnection: Initiating connection to
[kvs-in-wfanub03.int.kronos.com/10.131.141.192:40195]
15/03/11 15:16:52 INFO SendingConnection: Connected to
[kvs-in-wfanub03.int.kronos.com/10.131.141.192:40195], 1 messages pending
15/03/11 15:16:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on
10.131.141.192:40195 (size: 3.5 KB, free: 267.3 MB)
15/03/11 15:16:53 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 7,
10.131.141.192, PROCESS_LOCAL, 1217 bytes)
15/03/11 15:16:53 INFO TaskSetManager: Lost task 0.3 in stage 1.0 (TID 6) on
executor 10.131.141.192: java.lang.NoSuchMethodError
(org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;)
[duplicate 2]
15/03/11 15:16:53 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times;
aborting job
15/03/11 15:16:53 INFO TaskSchedulerImpl: Cancelling stage 1
15/03/11 15:16:53 INFO TaskSchedulerImpl: Stage 1 was cancelled
15/03/11 15:16:53 INFO DAGScheduler: Failed to run runJob at
RDDFunctions.scala:29
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID
6, 10.131.141.192): java.lang.NoSuchMethodError:
org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;
com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70)
com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)
com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
scala> 15/03/11 15:16:53 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195)
15/03/11 15:16:53 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195)
15/03/11 15:16:53 ERROR ConnectionManager: Corresponding SendingConnection to
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195) not found
15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Executor 2 disconnected, so
removing it
15/03/11 15:16:53 ERROR TaskSchedulerImpl: Lost executor 2 on 10.131.141.192:
remote Akka client disassociated
15/03/11 15:16:53 INFO TaskSetManager: Re-queueing tasks for 2 from TaskSet 1.0
15/03/11 15:16:53 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 7,
10.131.141.192): ExecutorLostFailure (executor lost)
15/03/11 15:16:53 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have
all completed, from pool
15/03/11 15:16:53 INFO DAGScheduler: Executor lost: 2 (epoch 2)
15/03/11 15:16:53 INFO BlockManagerMasterActor: Trying to remove executor 2
from BlockManagerMaster.
15/03/11 15:16:53 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(2, 10.131.141.192, 40195, 0)
15/03/11 15:16:53 INFO BlockManagerMaster: Removed 2 successfully in
removeExecutor
15/03/11 15:16:53 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/2 is now EXITED (Command exited with code 50)
15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Executor
app-20150311151059-0008/2 removed: Command exited with code 50
15/03/11 15:16:53 INFO AppClient$ClientActor: Executor added:
app-20150311151059-0008/4 on worker-20150310181709-10.131.141.192-58499
(10.131.141.192:58499) with 1 cores
15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20150311151059-0008/4 on hostPort 10.131.141.192:58499 with 1 cores, 512.0
MB RAM
15/03/11 15:16:53 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/4 is now LOADING
15/03/11 15:16:54 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://[email protected]:55631/user/Executor#-183896177]
with ID 3
15/03/11 15:16:54 INFO BlockManagerMasterActor: Registering block manager
10.131.141.193:56916 with 267.3 MB RAM, BlockManagerId(3, 10.131.141.193,
56916, 0)
15/03/11 15:16:54 INFO AppClient$ClientActor: Executor updated:
app-20150311151059-0008/4 is now RUNNING
15/03/11 15:16:58 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://[email protected]:37182/user/Executor#577095849]
with ID 4
15/03/11 15:16:58 INFO BlockManagerMasterActor: Registering block manager
10.131.141.192:34398 with 267.3 MB RAM, BlockManagerId(4, 10.131.141.192,
34398, 0)
data.sa
Regards,
Tarun Tiwari | Workforce Analytics-ETL | Kronos India
M: +91 9540 28 27 77 | Tel: +91 120 4015200
Kronos | Time & Attendance * Scheduling * Absence Management * HR & Payroll *
Hiring * Labor Analytics
Join Kronos on: kronos.com<http://www.kronos.com/> |
Facebook<http://www.kronos.com/facebook> |
Twitter<http://www.kronos.com/twitter> |
LinkedIn<http://www.kronos.com/linkedin> |
YouTube<http://www.kronos.com/youtube>