I'm doing a spark test with spark streaming, cassandra and kafka. I have an action which has an DStream as input and save to Cassandra and sometimes put some elements in Kafka. I'm using https://github.com/holdenk/spark-testing-base and kafka y cassandra in local.
My method looks like: *def execute(dstream: DStream[MyObject]) : Unit = { //Some proccesing --> this works //Save to Cassandra some RDDs --> this works //Send to Kafka some record. --> this doesn't work in test, it works outside of the test.}* When I send data to Kafka: *//There is an error in this method* *def sendToKafka(rec: DStream[CrmTipoCliente]) = { rec.foreachRDD( r => { r.foreachPartition { val kafka = SparkKafkaSink[String,String](Config.kafkapropsProd) --> Exception here. Config.kafkapr.... returns a properties with the values to connect to Kafka partition => partition.foreach { message => { //Some logic.. kafka.send("test", null, "message") } } } })* My test looks like: *@RunWith(classOf[JUnitRunner])class CassandraConnectionIntegrationTest extends FunSuite with BeforeAndAfter with BeforeAndAfterAll with StreamingActionBase{ var cluster: Cluster = _ implicit var session: Session = _ val keyspace: String = "iris" val table: String = keyspace + ".tipo_cliente_ref" var service: MyClass = _override def beforeAll(): Unit = { super.beforeAll() //This line doesn't work! sc.getConf.set("spark.driver.allowMultipleContexts", "true") ...test("Insert record ") { val inputInsert = MyObject("...") val input = List(List(inputInsert)) runAction[MyObject](input, service.execute) val result = session.execute("select * from myTable WHERE...") //Some assert to Cassandra and Kafka}* This test partial works, it saves data into Cassandra but it doesn't work when it has to send data to Kafka. The error I can see: 23:58:45.329 [pool-22-thread-1] INFO o.a.spark.streaming.CheckpointWriter - Saving checkpoint for time 1000 ms to file 'file:/C:/Users/A148681/AppData/Local/Temp/spark-cdf3229b-9d84 -400f-b92a-5ff4086b81c3/checkpoint-1000' Exception in thread "streaming-job-executor-0" java.lang.ExceptionInInitializerError at com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass.scala:49) at com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass .scala:47) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) *Caused by: org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. Th* e currently running SparkContext was created at: org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258) com.holdenkarau.spark.testing.SharedSparkContext$class.beforeAll(SharedSparkContext.scala:45)