Testing spark-testing-base. Error multiple SparkContext

2018-04-03 Thread Guillermo Ortiz
I'm doing a spark test with spark streaming, cassandra and kafka.
I have an action which has an DStream as input and save to Cassandra and
sometimes put some elements in Kafka.
I'm using https://github.com/holdenk/spark-testing-base and kafka y
cassandra in local.


My method looks like:





*def execute(dstream: DStream[MyObject]) : Unit = {//Some proccesing
--> this works//Save to Cassandra some RDDs --> this works//Send to
Kafka some record. --> this doesn't work in test, it works outside of the
test.}*


When I send data to Kafka:


*//There is an error in this method*













*def sendToKafka(rec: DStream[CrmTipoCliente]) = {  rec.foreachRDD( r =>
{r.foreachPartition {  val kafka =
SparkKafkaSink[String,String](Config.kafkapropsProd)  --> Exception here.
Config.kafkapr returns a properties with the values to connect to
Kafka  partition =>partition.foreach {  message =>
{//Some logic..kafka.send("test", null,
"message")  }}} })*

My test looks like:

























*@RunWith(classOf[JUnitRunner])class CassandraConnectionIntegrationTest
extends FunSuite with BeforeAndAfter with BeforeAndAfterAll with
StreamingActionBase{var cluster: Cluster = _implicit var session:
Session = _val keyspace: String = "iris"val table: String =
keyspace + ".tipo_cliente_ref"var service: MyClass = _override def
beforeAll(): Unit = {super.beforeAll()//This line doesn't work!
 sc.getConf.set("spark.driver.allowMultipleContexts", "true")
 ...test("Insert record ") {val inputInsert = MyObject("...")val
input = List(List(inputInsert))runAction[MyObject](input,
service.execute)val result = session.execute("select * from myTable
WHERE...")//Some assert to Cassandra and Kafka}*
This test partial works, it saves data into Cassandra but it doesn't work
when it has to send data to Kafka.

The error I can see:
23:58:45.329 [pool-22-thread-1] INFO  o.a.spark.streaming.CheckpointWriter
- Saving checkpoint for time 1000 ms to file
'file:/C:/Users/A148681/AppData/Local/Temp/spark-cdf3229b-9d84
-400f-b92a-5ff4086b81c3/checkpoint-1000'
Exception in thread "streaming-job-executor-0"
java.lang.ExceptionInInitializerError
at
com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass.scala:49)
at
com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass
.scala:47)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
*Caused by: org.apache.spark.SparkException: Only one SparkContext may be
running in this JVM (see SPARK-2243). To ignore this error, set
spark.driver.allowMultipleContexts = true. Th*
e currently running SparkContext was created at:
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
com.holdenkarau.spark.testing.SharedSparkContext$class.beforeAll(SharedSparkContext.scala:45)


Re: Base ERROR

2015-12-17 Thread Jeff Zhang
I believe this is hbase issue, you'd better to ask on hbase mail list.



On Fri, Dec 18, 2015 at 9:57 AM, censj  wrote:

> hi,all:
> I wirte data to hbase,but Hbase arise this ERROR,Could you help me?
>
>
> r.KeeperException$SessionExpiredException: KeeperErrorCode = Session
> expired for /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020]
> zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper,
> quorum=byd0151:2181,byd0150:2181,byd0152:2181,
> exception=org.apache.zookeeper.KeeperException$SessionExpiredException:
> KeeperErrorCode = Session expired for
> /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 ERROR [regionserver/byd0157/192.168.0.157:16020]
> zookeeper.RecoverableZooKeeper: ZooKeeper delete failed after 4 attempts
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: Failed deleting my ephemeral node
> org.apache.zookeeper.KeeperException$SessionExpiredException:
> KeeperErrorCode = Session expired for
> /hbase-unsecure/rs/byd0157,16020,1449106975377
>at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
>at
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:179)
>at
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1345)
>at
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1334)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.deleteMyEphemeralNode(HRegionServer.java:1393)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:1076)
>at java.lang.Thread.run(Thread.java:745)
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: stopping server byd0157,16020,1449106975377;
> zookeeper connection closed.
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: regionserver/byd0157/192.168.0.157:16020
> exiting
> 2015-12-17 21:24:29,858 ERROR [main]
> regionserver.HRegionServerCommandLine: Region server exiting
> java.lang.RuntimeException: HRegionServer Aborted
>at
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.start(HRegionServerCommandLine.java:68)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.run(HRegionServerCommandLine.java:87)
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>at
> org.apache.hadoop.hbase.util.ServerCommandLine.doMain(ServerCommandLine.java:126)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.main(HRegionServer.java:2641)
> 2015-12-17 21:24:29,940 INFO  [Thread-6] regionserver.ShutdownHook:
> Shutdown hook starting; hbase.shutdown.hook=true;
> fsShutdownHook=org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer@6de54b40
> 2015-12-17 21:24:29,942 INFO  [Thread-6] regionserver.ShutdownHook:
> Starting fs shutdown hook thread.
> 2015-12-17 21:24:29,953 INFO  [Thread-6] regionserver.ShutdownHook:
> Shutdown hook finished.
>
>
>


-- 
Best Regards

Jeff Zhang


Base ERROR

2015-12-17 Thread censj
hi,all:
I wirte data to hbase,but Hbase arise this ERROR,Could you help me?
> 
> r.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired 
> for /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020] 
> zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper, 
> quorum=byd0151:2181,byd0150:2181,byd0152:2181, 
> exception=org.apache.zookeeper.KeeperException$SessionExpiredException: 
> KeeperErrorCode = Session expired for 
> /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 ERROR [regionserver/byd0157/192.168.0.157:16020] 
> zookeeper.RecoverableZooKeeper: ZooKeeper delete failed after 4 attempts
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020] 
> regionserver.HRegionServer: Failed deleting my ephemeral node
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for /hbase-unsecure/rs/byd0157,16020,1449106975377
>at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
>at 
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:179)
>at 
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1345)
>at 
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1334)
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServer.deleteMyEphemeralNode(HRegionServer.java:1393)
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:1076)
>at java.lang.Thread.run(Thread.java:745)
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020] 
> regionserver.HRegionServer: stopping server byd0157,16020,1449106975377; 
> zookeeper connection closed.
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020] 
> regionserver.HRegionServer: regionserver/byd0157/192.168.0.157:16020 exiting
> 2015-12-17 21:24:29,858 ERROR [main] regionserver.HRegionServerCommandLine: 
> Region server exiting
> java.lang.RuntimeException: HRegionServer Aborted
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.start(HRegionServerCommandLine.java:68)
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.run(HRegionServerCommandLine.java:87)
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>at 
> org.apache.hadoop.hbase.util.ServerCommandLine.doMain(ServerCommandLine.java:126)
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServer.main(HRegionServer.java:2641)
> 2015-12-17 21:24:29,940 INFO  [Thread-6] regionserver.ShutdownHook: Shutdown 
> hook starting; hbase.shutdown.hook=true; 
> fsShutdownHook=org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer@6de54b40
> 2015-12-17 21:24:29,942 INFO  [Thread-6] regionserver.ShutdownHook: Starting 
> fs shutdown hook thread.
> 2015-12-17 21:24:29,953 INFO  [Thread-6] regionserver.ShutdownHook: Shutdown 
> hook finished.