Re: Spark Streaming into HBase
: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng [hidden email] http://user/SendEmail.jtp?type=nodenode=13406i=2 wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu [hidden email] http://user/SendEmail.jtp?type=nodenode=13406i=3 wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 [hidden email] http://user/SendEmail.jtp?type=nodenode=13406i=4 wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13406i=5 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13406i=6 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13406.html To unsubscribe from Spark Streaming into HBase, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Spark Streaming into HBase http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13478.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Spark Streaming into HBase
I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming into HBase
Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming into HBase
This doesn't seem to have to do with HBase per se. Some function is getting the StreamingContext into the closure and that won't work. Is this exactly the code? since it doesn't reference a StreamingContext, but is there maybe a different version in reality that tries to use StreamingContext inside a function? On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu yuzhih...@gmail.com wrote: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming into HBase
Sean, I create a streaming context near the bottom of the code (ssc) and basically apply a foreachRDD on the resulting DStream so that I can get access to the underlying RDD, which in return I apply a foreach on and pass in my function which applies the storing logic. Is there a different approach I should be using? Thanks for the help. On Wed, Sep 3, 2014 at 2:43 PM, Sean Owen-2 [via Apache Spark User List] ml-node+s1001560n13385...@n3.nabble.com wrote: This doesn't seem to have to do with HBase per se. Some function is getting the StreamingContext into the closure and that won't work. Is this exactly the code? since it doesn't reference a StreamingContext, but is there maybe a different version in reality that tries to use StreamingContext inside a function? On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=0 wrote: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=1 wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=2 wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=3 wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=4 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=5 - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=6 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=13385i=7 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13385.html To unsubscribe from Spark Streaming into HBase, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=13378code=a3BlbmcxQGdtYWlsLmNvbXwxMzM3OHwxMjA2NzA5NzQ3 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace
Re: Spark Streaming into HBase
Ted, Here is the full stack trace coming from spark-shell: 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming job 1409786463000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Basically, what I am doing on the terminal where I run nc -lk, I type in words separated by commas and hit enter i.e. bill,ted. On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu yuzhih...@gmail.com wrote: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) hTable.put(thePut) } val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, , StorageLevel.MEMORY_AND_DISK_SER) val words = lines.map(_.split(,)) val store = words.foreachRDD(rdd = rdd.foreach(blah)) ssc.start() I am currently running the above code in spark-shell. I am not sure what I am doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming into HBase
This is some issue with how Scala computes closures. Here because of the function blah it is trying the serialize the whole function that this code is part of. Can you define the function blah outside the main function? In fact you canTry putting the function in a serializable object. object BlahFunction extends Serializable { def blah(row: Array[Byte]) { } } On a related note, opening a connection for every record in the RDD is pretty inefficient. Use rdd.foreachPartition instead - open the connection, write the whole partition, and then close the conneciton. TD On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng kpe...@gmail.com wrote: Ted, Here is the full stack trace coming from spark-shell: 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming job 1409786463000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Basically, what I am doing on the terminal where I run nc -lk, I type in words separated by commas and hit enter i.e. bill,ted. On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu yuzhih...@gmail.com wrote: Adding back user@ I am not familiar with the NotSerializableException. Can you show the full stack trace ? See SPARK-1297 for changes you need to make so that Spark works with hbase 0.98 Cheers On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote: Ted, The hbase-site.xml is in the classpath (had worse issues before... until I figured that it wasn't in the path). I get the following error in the spark-shell: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc ... I also double checked the hbase table, just in case, and nothing new is written in there. I am using hbase version: 0.98.1-cdh5.1.0 the default one with the CDH5.1.0 distro. Thank you for the help. On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote: Is hbase-site.xml in the classpath ? Do you observe any exception from the code below or in region server log ? Which hbase release are you using ? On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote: I have been trying to understand how spark streaming and hbase connect, but have not been successful. What I am trying to do is given a spark stream, process that stream and store the results in an hbase table. So far this is what I have: import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} import org.apache.hadoop.hbase.util.Bytes def blah(row: Array[String]) { val hConf = new HBaseConfiguration() val hTable = new HTable(hConf, table) val thePut = new Put(Bytes.toBytes(row(0))) thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)), Bytes.toBytes(row(0
Re: Spark Streaming with HBase
Something like this??? import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.NewHadoopRDD; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import com.google.common.collect.Lists; import scala.Function1; import scala.collection.JavaConverters.*; import scala.reflect.ClassTag; public class SparkHBaseMain { public static void main(String[] arg){ try{ ListString jars = Lists.newArrayList(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar, /home/akhld/Downloads/hbase-server-0.96.0-hadoop2.jar, /home/akhld/Downloads/hbase-protocol-0.96.0-hadoop2.jar, /home/akhld/Downloads/hbase-hadoop2-compat-0.96.0-hadoop2.jar, /home/akhld/Downloads/hbase-common-0.96.0-hadoop2.jar, /home/akhld/Downloads/hbase-client-0.96.0-hadoop2.jar, /home/akhld/Downloads/htrace-core-2.02.jar); SparkConf spconf = new SparkConf(); spconf.setMaster(local); spconf.setAppName(HBaser); spconf.setSparkHome(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2); spconf.setJars(jars.toArray(new String[jars.size()])); spconf.set(spark.executor.memory, 1g); JavaStreamingContext jsc = new JavaStreamingContext(spconf,new Duration(1)); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.addResource(/home/akhld/mobi/temp/sung/hbase-site.xml); conf.set(TableInputFormat.INPUT_TABLE, blogposts); NewHadoopRDDImmutableBytesWritable, Result rdd2 = new NewHadoopRDDImmutableBytesWritable, Result (jsc.ssc().sc(), TableInputFormat.class, ImmutableBytesWritable.class, Result.class, conf); System.out.println(rdd2.count()); jsc.start(); }catch(Exception e){ e.printStackTrace(); System.out.println(Crshed : + e); } } } Thanks Best Regards On Sun, Jun 29, 2014 at 10:16 PM, N.Venkata Naga Ravi nvn_r...@hotmail.com wrote: Hi, Is there any example provided for Spark Streaming with Input provided from HBase table content. Thanks, Ravi
Spark Streaming with HBase
Hi, Is there any example provided for Spark Streaming with Input provided from HBase table content. Thanks, Ravi