Re: Spark Streaming into HBase

2014-09-05 Thread Tathagata Das
:

 Adding back user@

 I am not familiar with the NotSerializableException. Can you show the
 full stack trace ?

 See SPARK-1297 for changes you need to make so that Spark works with
 hbase 0.98

 Cheers


 On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13406i=2 wrote:

 Ted,

 The hbase-site.xml is in the classpath (had worse issues before...
 until I figured that it wasn't in the path).

 I get the following error in the spark-shell:
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Task not serializable: java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
 ...

 I also double checked the hbase table, just in case, and nothing new
 is written in there.

 I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
 CDH5.1.0 distro.

 Thank you for the help.


 On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13406i=3 wrote:

 Is hbase-site.xml in the classpath ?
 Do you observe any exception from the code below or in region server
 log ?

 Which hbase release are you using ?


 On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13406i=4 wrote:

 I have been trying to understand how spark streaming and hbase
 connect, but
 have not been successful. What I am trying to do is given a spark
 stream,
 process that stream and store the results in an hbase table. So far
 this is
 what I have:

 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
 import org.apache.hadoop.hbase.util.Bytes

 def blah(row: Array[String]) {
   val hConf = new HBaseConfiguration()
   val hTable = new HTable(hConf, table)
   val thePut = new Put(Bytes.toBytes(row(0)))
   thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
 Bytes.toBytes(row(0)))
   hTable.put(thePut)
 }

 val ssc = new StreamingContext(sc, Seconds(1))
 val lines = ssc.socketTextStream(localhost, ,
 StorageLevel.MEMORY_AND_DISK_SER)
 val words = lines.map(_.split(,))
 val store = words.foreachRDD(rdd = rdd.foreach(blah))
 ssc.start()

 I am currently running the above code in spark-shell. I am not sure
 what I
 am doing wrong.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13406i=5
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13406i=6








 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13406.html
  To unsubscribe from Spark Streaming into HBase, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
 View this message in context: Re: Spark Streaming into HBase
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13478.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Spark Streaming into HBase

2014-09-03 Thread kpeng1
I have been trying to understand how spark streaming and hbase connect, but
have not been successful. What I am trying to do is given a spark stream,
process that stream and store the results in an hbase table. So far this is
what I have:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
import org.apache.hadoop.hbase.util.Bytes

def blah(row: Array[String]) {
  val hConf = new HBaseConfiguration()
  val hTable = new HTable(hConf, table)
  val thePut = new Put(Bytes.toBytes(row(0)))
  thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
Bytes.toBytes(row(0)))
  hTable.put(thePut)
}

val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream(localhost, ,
StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.map(_.split(,))
val store = words.foreachRDD(rdd = rdd.foreach(blah))
ssc.start()

I am currently running the above code in spark-shell. I am not sure what I
am doing wrong.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming into HBase

2014-09-03 Thread Ted Yu
Adding back user@

I am not familiar with the NotSerializableException. Can you show the full
stack trace ?

See SPARK-1297 for changes you need to make so that Spark works with hbase
0.98

Cheers


On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote:

 Ted,

 The hbase-site.xml is in the classpath (had worse issues before... until I
 figured that it wasn't in the path).

 I get the following error in the spark-shell:
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
 ...

 I also double checked the hbase table, just in case, and nothing new is
 written in there.

 I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
 CDH5.1.0 distro.

 Thank you for the help.


 On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote:

 Is hbase-site.xml in the classpath ?
 Do you observe any exception from the code below or in region server log ?

 Which hbase release are you using ?


 On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote:

 I have been trying to understand how spark streaming and hbase connect,
 but
 have not been successful. What I am trying to do is given a spark stream,
 process that stream and store the results in an hbase table. So far this
 is
 what I have:

 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
 import org.apache.hadoop.hbase.util.Bytes

 def blah(row: Array[String]) {
   val hConf = new HBaseConfiguration()
   val hTable = new HTable(hConf, table)
   val thePut = new Put(Bytes.toBytes(row(0)))
   thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
 Bytes.toBytes(row(0)))
   hTable.put(thePut)
 }

 val ssc = new StreamingContext(sc, Seconds(1))
 val lines = ssc.socketTextStream(localhost, ,
 StorageLevel.MEMORY_AND_DISK_SER)
 val words = lines.map(_.split(,))
 val store = words.foreachRDD(rdd = rdd.foreach(blah))
 ssc.start()

 I am currently running the above code in spark-shell. I am not sure what
 I
 am doing wrong.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Spark Streaming into HBase

2014-09-03 Thread Sean Owen
This doesn't seem to have to do with HBase per se. Some function is
getting the StreamingContext into the closure and that won't work. Is
this exactly the code? since it doesn't reference a StreamingContext,
but is there maybe a different version in reality that tries to use
StreamingContext inside a function?

On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu yuzhih...@gmail.com wrote:
 Adding back user@

 I am not familiar with the NotSerializableException. Can you show the full
 stack trace ?

 See SPARK-1297 for changes you need to make so that Spark works with hbase
 0.98

 Cheers


 On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote:

 Ted,

 The hbase-site.xml is in the classpath (had worse issues before... until I
 figured that it wasn't in the path).

 I get the following error in the spark-shell:
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
 ...

 I also double checked the hbase table, just in case, and nothing new is
 written in there.

 I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
 CDH5.1.0 distro.

 Thank you for the help.


 On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote:

 Is hbase-site.xml in the classpath ?
 Do you observe any exception from the code below or in region server log
 ?

 Which hbase release are you using ?


 On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote:

 I have been trying to understand how spark streaming and hbase connect,
 but
 have not been successful. What I am trying to do is given a spark
 stream,
 process that stream and store the results in an hbase table. So far this
 is
 what I have:

 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
 import org.apache.hadoop.hbase.util.Bytes

 def blah(row: Array[String]) {
   val hConf = new HBaseConfiguration()
   val hTable = new HTable(hConf, table)
   val thePut = new Put(Bytes.toBytes(row(0)))
   thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
 Bytes.toBytes(row(0)))
   hTable.put(thePut)
 }

 val ssc = new StreamingContext(sc, Seconds(1))
 val lines = ssc.socketTextStream(localhost, ,
 StorageLevel.MEMORY_AND_DISK_SER)
 val words = lines.map(_.split(,))
 val store = words.foreachRDD(rdd = rdd.foreach(blah))
 ssc.start()

 I am currently running the above code in spark-shell. I am not sure what
 I
 am doing wrong.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming into HBase

2014-09-03 Thread kpeng1
Sean,

I create a streaming context near the bottom of the code (ssc) and
basically apply a foreachRDD on the resulting DStream so that I can get
access to the underlying RDD, which in return I apply a foreach on and pass
in my function which applies the storing logic.

Is there a different approach I should be using?

Thanks for the help.


On Wed, Sep 3, 2014 at 2:43 PM, Sean Owen-2 [via Apache Spark User List] 
ml-node+s1001560n13385...@n3.nabble.com wrote:

 This doesn't seem to have to do with HBase per se. Some function is
 getting the StreamingContext into the closure and that won't work. Is
 this exactly the code? since it doesn't reference a StreamingContext,
 but is there maybe a different version in reality that tries to use
 StreamingContext inside a function?

 On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=0 wrote:

  Adding back user@
 
  I am not familiar with the NotSerializableException. Can you show the
 full
  stack trace ?
 
  See SPARK-1297 for changes you need to make so that Spark works with
 hbase
  0.98
 
  Cheers
 
 
  On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=1 wrote:
 
  Ted,
 
  The hbase-site.xml is in the classpath (had worse issues before...
 until I
  figured that it wasn't in the path).
 
  I get the following error in the spark-shell:
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
  not serializable: java.io.NotSerializableException:
  org.apache.spark.streaming.StreamingContext
  at
  org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc

  ...
 
  I also double checked the hbase table, just in case, and nothing new is
  written in there.
 
  I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
  CDH5.1.0 distro.
 
  Thank you for the help.
 
 
  On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=2 wrote:
 
  Is hbase-site.xml in the classpath ?
  Do you observe any exception from the code below or in region server
 log
  ?
 
  Which hbase release are you using ?
 
 
  On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=3 wrote:
 
  I have been trying to understand how spark streaming and hbase
 connect,
  but
  have not been successful. What I am trying to do is given a spark
  stream,
  process that stream and store the results in an hbase table. So far
 this
  is
  what I have:
 
  import org.apache.spark.SparkConf
  import org.apache.spark.streaming.{Seconds, StreamingContext}
  import org.apache.spark.streaming.StreamingContext._
  import org.apache.spark.storage.StorageLevel
  import org.apache.hadoop.hbase.HBaseConfiguration
  import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
  import org.apache.hadoop.hbase.util.Bytes
 
  def blah(row: Array[String]) {
val hConf = new HBaseConfiguration()
val hTable = new HTable(hConf, table)
val thePut = new Put(Bytes.toBytes(row(0)))
thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
  Bytes.toBytes(row(0)))
hTable.put(thePut)
  }
 
  val ssc = new StreamingContext(sc, Seconds(1))
  val lines = ssc.socketTextStream(localhost, ,
  StorageLevel.MEMORY_AND_DISK_SER)
  val words = lines.map(_.split(,))
  val store = words.foreachRDD(rdd = rdd.foreach(blah))
  ssc.start()
 
  I am currently running the above code in spark-shell. I am not sure
 what
  I
  am doing wrong.
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=4
  For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=5
 
 
 
 

 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=6
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=7



 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13385.html
  To unsubscribe from Spark Streaming into HBase, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=13378code=a3BlbmcxQGdtYWlsLmNvbXwxMzM3OHwxMjA2NzA5NzQ3
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace

Re: Spark Streaming into HBase

2014-09-03 Thread Kevin Peng
Ted,

Here is the full stack trace coming from spark-shell:

14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming
job 1409786463000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)

at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Basically, what I am doing on the terminal where I run nc -lk, I type in
words separated by commas and hit enter i.e. bill,ted.


On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu yuzhih...@gmail.com wrote:

 Adding back user@

 I am not familiar with the NotSerializableException. Can you show the
 full stack trace ?

 See SPARK-1297 for changes you need to make so that Spark works with
 hbase 0.98

 Cheers


 On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote:

 Ted,

 The hbase-site.xml is in the classpath (had worse issues before... until
 I figured that it wasn't in the path).

 I get the following error in the spark-shell:
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
 ...

 I also double checked the hbase table, just in case, and nothing new is
 written in there.

 I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
 CDH5.1.0 distro.

 Thank you for the help.


 On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote:

 Is hbase-site.xml in the classpath ?
 Do you observe any exception from the code below or in region server log
 ?

 Which hbase release are you using ?


 On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote:

 I have been trying to understand how spark streaming and hbase connect,
 but
 have not been successful. What I am trying to do is given a spark
 stream,
 process that stream and store the results in an hbase table. So far
 this is
 what I have:

 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
 import org.apache.hadoop.hbase.util.Bytes

 def blah(row: Array[String]) {
   val hConf = new HBaseConfiguration()
   val hTable = new HTable(hConf, table)
   val thePut = new Put(Bytes.toBytes(row(0)))
   thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
 Bytes.toBytes(row(0)))
   hTable.put(thePut)
 }

 val ssc = new StreamingContext(sc, Seconds(1))
 val lines = ssc.socketTextStream(localhost, ,
 StorageLevel.MEMORY_AND_DISK_SER)
 val words = lines.map(_.split(,))
 val store = words.foreachRDD(rdd = rdd.foreach(blah))
 ssc.start()

 I am currently running the above code in spark-shell. I am not sure
 what I
 am doing wrong.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

Re: Spark Streaming into HBase

2014-09-03 Thread Tathagata Das
This is some issue with how Scala computes closures. Here because of the
function blah it is trying the serialize the whole function that this code
is part of. Can you define the function blah outside the main function?  In
fact you canTry putting the function in a serializable object.

object BlahFunction extends Serializable {

   def blah(row: Array[Byte]) {  }
}

On a related note, opening a connection for every record in the RDD is
pretty inefficient. Use rdd.foreachPartition instead - open the connection,
write the whole partition, and then close the conneciton.

TD


On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng kpe...@gmail.com wrote:

 Ted,

 Here is the full stack trace coming from spark-shell:

 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job
 streaming job 1409786463000 ms.0

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)

 at
 org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)

 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)

 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

 at akka.actor.ActorCell.invoke(ActorCell.scala:456)

 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

 at akka.dispatch.Mailbox.run(Mailbox.scala:219)

 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Basically, what I am doing on the terminal where I run nc -lk, I type in
 words separated by commas and hit enter i.e. bill,ted.


 On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu yuzhih...@gmail.com wrote:

 Adding back user@

 I am not familiar with the NotSerializableException. Can you show the
 full stack trace ?

 See SPARK-1297 for changes you need to make so that Spark works with
 hbase 0.98

 Cheers


 On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote:

 Ted,

 The hbase-site.xml is in the classpath (had worse issues before... until
 I figured that it wasn't in the path).

 I get the following error in the spark-shell:
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
 ...

 I also double checked the hbase table, just in case, and nothing new is
 written in there.

 I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
 CDH5.1.0 distro.

 Thank you for the help.


 On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote:

 Is hbase-site.xml in the classpath ?
 Do you observe any exception from the code below or in region server
 log ?

 Which hbase release are you using ?


 On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote:

 I have been trying to understand how spark streaming and hbase
 connect, but
 have not been successful. What I am trying to do is given a spark
 stream,
 process that stream and store the results in an hbase table. So far
 this is
 what I have:

 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
 import org.apache.hadoop.hbase.util.Bytes

 def blah(row: Array[String]) {
   val hConf = new HBaseConfiguration()
   val hTable = new HTable(hConf, table)
   val thePut = new Put(Bytes.toBytes(row(0)))
   thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
 Bytes.toBytes(row(0

Re: Spark Streaming with HBase

2014-06-30 Thread Akhil Das
Something like this???

import java.util.List;

import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.NewHadoopRDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import com.google.common.collect.Lists;

import scala.Function1;
import scala.collection.JavaConverters.*;
import scala.reflect.ClassTag;

public class SparkHBaseMain {

 public static void main(String[] arg){
 try{
 ListString jars =
Lists.newArrayList(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar,
/home/akhld/Downloads/hbase-server-0.96.0-hadoop2.jar,
/home/akhld/Downloads/hbase-protocol-0.96.0-hadoop2.jar,
/home/akhld/Downloads/hbase-hadoop2-compat-0.96.0-hadoop2.jar,
/home/akhld/Downloads/hbase-common-0.96.0-hadoop2.jar,
/home/akhld/Downloads/hbase-client-0.96.0-hadoop2.jar,
/home/akhld/Downloads/htrace-core-2.02.jar);

SparkConf spconf = new SparkConf();
spconf.setMaster(local);
spconf.setAppName(HBaser);
spconf.setSparkHome(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2);
spconf.setJars(jars.toArray(new String[jars.size()]));
spconf.set(spark.executor.memory, 1g);

JavaStreamingContext jsc = new JavaStreamingContext(spconf,new
Duration(1));
 org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.addResource(/home/akhld/mobi/temp/sung/hbase-site.xml);
conf.set(TableInputFormat.INPUT_TABLE, blogposts);
 NewHadoopRDDImmutableBytesWritable, Result rdd2 =
new NewHadoopRDDImmutableBytesWritable, Result
(jsc.ssc().sc(), TableInputFormat.class, ImmutableBytesWritable.class,
Result.class, conf);
 System.out.println(rdd2.count());

jsc.start();

 }catch(Exception e){
 e.printStackTrace();
System.out.println(Crshed :  + e);
 }
  }
}


Thanks
Best Regards


On Sun, Jun 29, 2014 at 10:16 PM, N.Venkata Naga Ravi nvn_r...@hotmail.com
wrote:

 Hi,

 Is there any example provided for Spark Streaming with Input provided from
 HBase table content.

 Thanks,
 Ravi



Spark Streaming with HBase

2014-06-29 Thread N . Venkata Naga Ravi
Hi,

Is there any example provided for Spark Streaming with Input provided from 
HBase table content.

Thanks,
Ravi