This is some issue with how Scala computes closures. Here because of the
function blah it is trying the serialize the whole function that this code
is part of. Can you define the function blah outside the main function?  In
fact you canTry putting the function in a serializable object.

object BlahFunction extends Serializable {

   def blah(row: Array[Byte]) { .... }
}

On a related note, opening a connection for every record in the RDD is
pretty inefficient. Use rdd.foreachPartition instead - open the connection,
write the whole partition, and then close the conneciton.

TD


On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng <kpe...@gmail.com> wrote:

> Ted,
>
> Here is the full stack trace coming from spark-shell:
>
> 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job
> streaming job 1409786463000 ms.0
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> not serializable: java.io.NotSerializableException:
> org.apache.spark.streaming.StreamingContext
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Basically, what I am doing on the terminal where I run nc -lk, I type in
> words separated by commas and hit enter i.e. "bill,ted".
>
>
> On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Adding back user@
>>
>> I am not familiar with the NotSerializableException. Can you show the
>> full stack trace ?
>>
>> See SPARK-1297 for changes you need to make so that Spark works with
>> hbase 0.98
>>
>> Cheers
>>
>>
>> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <kpe...@gmail.com> wrote:
>>
>>> Ted,
>>>
>>> The hbase-site.xml is in the classpath (had worse issues before... until
>>> I figured that it wasn't in the path).
>>>
>>> I get the following error in the spark-shell:
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> not serializable: java.io.NotSerializableException:
>>> org.apache.spark.streaming.StreamingContext
>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>>> ...
>>>
>>> I also double checked the hbase table, just in case, and nothing new is
>>> written in there.
>>>
>>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
>>> CDH5.1.0 distro.
>>>
>>> Thank you for the help.
>>>
>>>
>>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Is hbase-site.xml in the classpath ?
>>>> Do you observe any exception from the code below or in region server
>>>> log ?
>>>>
>>>> Which hbase release are you using ?
>>>>
>>>>
>>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <kpe...@gmail.com> wrote:
>>>>
>>>>> I have been trying to understand how spark streaming and hbase
>>>>> connect, but
>>>>> have not been successful. What I am trying to do is given a spark
>>>>> stream,
>>>>> process that stream and store the results in an hbase table. So far
>>>>> this is
>>>>> what I have:
>>>>>
>>>>> import org.apache.spark.SparkConf
>>>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>>>> import org.apache.spark.streaming.StreamingContext._
>>>>> import org.apache.spark.storage.StorageLevel
>>>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>>
>>>>> def blah(row: Array[String]) {
>>>>>   val hConf = new HBaseConfiguration()
>>>>>   val hTable = new HTable(hConf, "table")
>>>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>>>> Bytes.toBytes(row(0)))
>>>>>   hTable.put(thePut)
>>>>> }
>>>>>
>>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>>> val lines = ssc.socketTextStream("localhost", 9999,
>>>>> StorageLevel.MEMORY_AND_DISK_SER)
>>>>> val words = lines.map(_.split(","))
>>>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
>>>>> ssc.start()
>>>>>
>>>>> I am currently running the above code in spark-shell. I am not sure
>>>>> what I
>>>>> am doing wrong.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to