Testing spark-testing-base. Error multiple SparkContext
I'm doing a spark test with spark streaming, cassandra and kafka. I have an action which has an DStream as input and save to Cassandra and sometimes put some elements in Kafka. I'm using https://github.com/holdenk/spark-testing-base and kafka y cassandra in local. My method looks like: *def execute(dstream: DStream[MyObject]) : Unit = {//Some proccesing --> this works//Save to Cassandra some RDDs --> this works//Send to Kafka some record. --> this doesn't work in test, it works outside of the test.}* When I send data to Kafka: *//There is an error in this method* *def sendToKafka(rec: DStream[CrmTipoCliente]) = { rec.foreachRDD( r => {r.foreachPartition { val kafka = SparkKafkaSink[String,String](Config.kafkapropsProd) --> Exception here. Config.kafkapr returns a properties with the values to connect to Kafka partition =>partition.foreach { message => {//Some logic..kafka.send("test", null, "message") }}} })* My test looks like: *@RunWith(classOf[JUnitRunner])class CassandraConnectionIntegrationTest extends FunSuite with BeforeAndAfter with BeforeAndAfterAll with StreamingActionBase{var cluster: Cluster = _implicit var session: Session = _val keyspace: String = "iris"val table: String = keyspace + ".tipo_cliente_ref"var service: MyClass = _override def beforeAll(): Unit = {super.beforeAll()//This line doesn't work! sc.getConf.set("spark.driver.allowMultipleContexts", "true") ...test("Insert record ") {val inputInsert = MyObject("...")val input = List(List(inputInsert))runAction[MyObject](input, service.execute)val result = session.execute("select * from myTable WHERE...")//Some assert to Cassandra and Kafka}* This test partial works, it saves data into Cassandra but it doesn't work when it has to send data to Kafka. The error I can see: 23:58:45.329 [pool-22-thread-1] INFO o.a.spark.streaming.CheckpointWriter - Saving checkpoint for time 1000 ms to file 'file:/C:/Users/A148681/AppData/Local/Temp/spark-cdf3229b-9d84 -400f-b92a-5ff4086b81c3/checkpoint-1000' Exception in thread "streaming-job-executor-0" java.lang.ExceptionInInitializerError at com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass.scala:49) at com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass .scala:47) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) *Caused by: org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. Th* e currently running SparkContext was created at: org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258) com.holdenkarau.spark.testing.SharedSparkContext$class.beforeAll(SharedSparkContext.scala:45)
Re: Base ERROR
I believe this is hbase issue, you'd better to ask on hbase mail list. On Fri, Dec 18, 2015 at 9:57 AM, censj wrote: > hi,all: > I wirte data to hbase,but Hbase arise this ERROR,Could you help me? > > > r.KeeperException$SessionExpiredException: KeeperErrorCode = Session > expired for /hbase-unsecure/rs/byd0157,16020,1449106975377 > 2015-12-17 21:24:29,854 WARN [regionserver/byd0157/192.168.0.157:16020] > zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper, > quorum=byd0151:2181,byd0150:2181,byd0152:2181, > exception=org.apache.zookeeper.KeeperException$SessionExpiredException: > KeeperErrorCode = Session expired for > /hbase-unsecure/rs/byd0157,16020,1449106975377 > 2015-12-17 21:24:29,854 ERROR [regionserver/byd0157/192.168.0.157:16020] > zookeeper.RecoverableZooKeeper: ZooKeeper delete failed after 4 attempts > 2015-12-17 21:24:29,854 WARN [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: Failed deleting my ephemeral node > org.apache.zookeeper.KeeperException$SessionExpiredException: > KeeperErrorCode = Session expired for > /hbase-unsecure/rs/byd0157,16020,1449106975377 >at > org.apache.zookeeper.KeeperException.create(KeeperException.java:127) >at > org.apache.zookeeper.KeeperException.create(KeeperException.java:51) >at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873) >at > org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:179) >at > org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1345) >at > org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1334) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.deleteMyEphemeralNode(HRegionServer.java:1393) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:1076) >at java.lang.Thread.run(Thread.java:745) > 2015-12-17 21:24:29,855 INFO [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: stopping server byd0157,16020,1449106975377; > zookeeper connection closed. > 2015-12-17 21:24:29,855 INFO [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: regionserver/byd0157/192.168.0.157:16020 > exiting > 2015-12-17 21:24:29,858 ERROR [main] > regionserver.HRegionServerCommandLine: Region server exiting > java.lang.RuntimeException: HRegionServer Aborted >at > org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.start(HRegionServerCommandLine.java:68) >at > org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.run(HRegionServerCommandLine.java:87) >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) >at > org.apache.hadoop.hbase.util.ServerCommandLine.doMain(ServerCommandLine.java:126) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.main(HRegionServer.java:2641) > 2015-12-17 21:24:29,940 INFO [Thread-6] regionserver.ShutdownHook: > Shutdown hook starting; hbase.shutdown.hook=true; > fsShutdownHook=org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer@6de54b40 > 2015-12-17 21:24:29,942 INFO [Thread-6] regionserver.ShutdownHook: > Starting fs shutdown hook thread. > 2015-12-17 21:24:29,953 INFO [Thread-6] regionserver.ShutdownHook: > Shutdown hook finished. > > > -- Best Regards Jeff Zhang
Base ERROR
hi,all: I wirte data to hbase,but Hbase arise this ERROR,Could you help me? > > r.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired > for /hbase-unsecure/rs/byd0157,16020,1449106975377 > 2015-12-17 21:24:29,854 WARN [regionserver/byd0157/192.168.0.157:16020] > zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper, > quorum=byd0151:2181,byd0150:2181,byd0152:2181, > exception=org.apache.zookeeper.KeeperException$SessionExpiredException: > KeeperErrorCode = Session expired for > /hbase-unsecure/rs/byd0157,16020,1449106975377 > 2015-12-17 21:24:29,854 ERROR [regionserver/byd0157/192.168.0.157:16020] > zookeeper.RecoverableZooKeeper: ZooKeeper delete failed after 4 attempts > 2015-12-17 21:24:29,854 WARN [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: Failed deleting my ephemeral node > org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode > = Session expired for /hbase-unsecure/rs/byd0157,16020,1449106975377 >at > org.apache.zookeeper.KeeperException.create(KeeperException.java:127) >at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) >at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873) >at > org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:179) >at > org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1345) >at > org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1334) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.deleteMyEphemeralNode(HRegionServer.java:1393) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:1076) >at java.lang.Thread.run(Thread.java:745) > 2015-12-17 21:24:29,855 INFO [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: stopping server byd0157,16020,1449106975377; > zookeeper connection closed. > 2015-12-17 21:24:29,855 INFO [regionserver/byd0157/192.168.0.157:16020] > regionserver.HRegionServer: regionserver/byd0157/192.168.0.157:16020 exiting > 2015-12-17 21:24:29,858 ERROR [main] regionserver.HRegionServerCommandLine: > Region server exiting > java.lang.RuntimeException: HRegionServer Aborted >at > org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.start(HRegionServerCommandLine.java:68) >at > org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.run(HRegionServerCommandLine.java:87) >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) >at > org.apache.hadoop.hbase.util.ServerCommandLine.doMain(ServerCommandLine.java:126) >at > org.apache.hadoop.hbase.regionserver.HRegionServer.main(HRegionServer.java:2641) > 2015-12-17 21:24:29,940 INFO [Thread-6] regionserver.ShutdownHook: Shutdown > hook starting; hbase.shutdown.hook=true; > fsShutdownHook=org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer@6de54b40 > 2015-12-17 21:24:29,942 INFO [Thread-6] regionserver.ShutdownHook: Starting > fs shutdown hook thread. > 2015-12-17 21:24:29,953 INFO [Thread-6] regionserver.ShutdownHook: Shutdown > hook finished.