Hi Chris,

We have a knowledge base article to explain what's happening here:

https://github.com/databricks/spark-knowledgebase/blob/master/troubleshooting/javaionotserializableexception.md

Let me know if the article is not clear enough - I would be happy to edit
and improve it.

-Vida


On Wed, Aug 20, 2014 at 5:09 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> My guess is that your test is trying to serialize a closure
> referencing "connectionInfo"; that closure will have a reference to
> the test instance, since the instance is needed to execute that
> method.
>
> Try to make the "connectionInfo" method local to the method where it's
> needed, or declare it in an object, to avoid that reference.
>
> On Wed, Aug 20, 2014 at 4:21 PM, Chris Jones
> <cvjone...@yahoo.com.invalid> wrote:
> > New to Apache Spark, trying to build a scalatest. Below is the error I'm
> > consistently seeing. Somehow Spark is trying to load a scalatest
> > AssertionHelper class which is not serializable. The scalatest I have
> > specified doesn't even have any assertions in it. I added the JVM flag
> >
> > -Dsun.io.serialization.extendedDebugInfo=true
> >
> > to get more detailed output, which is included.
> >
> > Versions: Using scala 2.10.4, scalatest 2.2.2, spark 1.0.2.
> >
> > Here's the basic test code.
> >
> >
> > import java.sql
> > import java.sql.Connection
> >
> > import com.marchex.msa.bigjoin.data.{Keyword, Keywords}
> >
> > import org.apache.spark.{SparkConf, SparkContext}
> > import org.apache.spark.SparkContext._
> > import org.scalatest._
> >
> >
> > import org.scalatest.FunSuite
> >
> > /**
> > * Created by cvjones on 8/20/14.
> > */
> >
> >
> >
> > class KeywordsSpec extends FunSuite  {
> >
> >   implicit def connectionInfo(): Connection = {
> >     Class.forName("org.postgresql.Driver")
> >
> >
> sql.DriverManager.getConnection("jdbc:postgresql://localhost/warehouse?user=dev&password=Password23")
> >   }
> >
> > def keywordRows (implicit sc: SparkContext, connectionFun:
> ()=>Connection)
> > = {
> >
> >   println("Getting keyword rows for context "+sc+" connection
> > "+connectionFun)
> >   val ROW_START = 0
> >   val ROW_END = 1000
> >   val NUM_PARTITIONS = 10
> >
> >   // Query pulls in the phone_number_id, and adgroup_id if it exists
> > (otherwise adgroup_id is NONE)
> >   val rdd = new org.apache.spark.rdd.JdbcRDD(
> >     sc,
> >     connectionFun,
> >     "SELECT k.keyword_id,k.keyword,k.match_type FROM
> > fortknox_sandbox.v_keyword k " +
> >       " offset ? limit ?",
> >     ROW_START, ROW_END, NUM_PARTITIONS,
> >     row => Keyword(
> >       row.getLong("keyword_id"),
> >       row.getString("keyword"),
> >       row.getString("match_type")
> >     )
> >   )
> >   rdd
> > }
> >
> >   def keywordsByKeywordId (implicit sc: SparkContext, connectionFun:
> > ()=>Connection) = {
> >
> >     val keyword = keywordRows.map{
> >       keywordRecord => (keywordRecord.keyword_id,keywordRecord)
> >     }
> >     keyword
> >   }
> >
> >   test("The keywords have more than zero rows ") {
> >
> >       implicit val sc = new SparkContext("local","test")
> >
> >       println("\n******Running simple test")
> >       val k = keywordsByKeywordId.collect
> >     }
> > }
> >
> > Here's the error.
> >
> > Job aborted due to stage failure: Task not serializable:
> > java.io.NotSerializableException:
> org.scalatest.Assertions$AssertionsHelper
> >     - field (class "org.scalatest.FunSuite", name: "assertionsHelper",
> type:
> > "class org.scalatest.Assertions$AssertionsHelper")
> >     - object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec)
> >     - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name:
> > "$outer", type: "class com.marchex.msa.tests.KeywordsSpec")
> >     - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1",
> > <function0>)
> >     - field (class
> > "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
> > name: "$outer", type: "class
> com.marchex.msa.tests.KeywordsSpec$$anonfun$1")
> >     - object (class
> > "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
> > <function0>)
> >     - field (class "org.apache.spark.rdd.JdbcRDD", name:
> > "org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface
> > scala.Function0")
> >     - object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at
> JdbcRDD at
> > KeywordsSpec.scala:36)
> >     - field (class "org.apache.spark.Dependency", name: "rdd", type:
> "class
> > org.apache.spark.rdd.RDD")
> >     - object (class "org.apache.spark.OneToOneDependency",
> > org.apache.spark.OneToOneDependency@2fe36ebc)
> >     - custom writeObject data (class
> > "scala.collection.immutable.$colon$colon")
> >     - object (class "scala.collection.immutable.$colon$colon",
> > List(org.apache.spark.OneToOneDependency@2fe36ebc))
> >     - field (class "org.apache.spark.rdd.RDD", name:
> > "org$apache$spark$rdd$RDD$$dependencies_", type: "interface
> > scala.collection.Seq")
> >     - root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1]
> at
> > map
> > at KeywordsSpec.scala:53)
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
> not
> > serializable: java.io.NotSerializableException:
> > org.scalatest.Assertions$AssertionsHelper
> >     - field (class "org.scalatest.FunSuite", name: "assertionsHelper",
> type:
> > "class org.scalatest.Assertions$AssertionsHelper")
> >     - object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec)
> >     - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name:
> > "$outer", type: "class com.marchex.msa.tests.KeywordsSpec")
> >     - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1",
> > <function0>)
> >     - field (class
> > "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
> > name: "$outer", type: "class
> com.marchex.msa.tests.KeywordsSpec$$anonfun$1")
> >     - object (class
> > "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
> > <function0>)
> >     - field (class "org.apache.spark.rdd.JdbcRDD", name:
> > "org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface
> > scala.Function0")
> >     - object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at
> JdbcRDD at
> > KeywordsSpec.scala:36)
> >     - field (class "org.apache.spark.Dependency", name: "rdd", type:
> "class
> > org.apache.spark.rdd.RDD")
> >     - object (class "org.apache.spark.OneToOneDependency",
> > org.apache.spark.OneToOneDependency@2fe36ebc)
> >     - custom writeObject data (class
> > "scala.collection.immutable.$colon$colon")
> >     - object (class "scala.collection.immutable.$colon$colon",
> > List(org.apache.spark.OneToOneDependency@2fe36ebc))
> >     - field (class "org.apache.spark.rdd.RDD", name:
> > "org$apache$spark$rdd$RDD$$dependencies_", type: "interface
> > scala.collection.Seq")
> >     - root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1]
> at
> > map
> > at KeywordsSpec.scala:53)
> >     at
> > org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
> >     at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
> >     at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
> >     at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >     at
> >
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
> >     at
> > org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772)
> >     at
> > org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715)
> >     at
> >
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699)
> >     at
> >
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203)
> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >     at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >     at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >     at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >     at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >     at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
>
>
>
> --
> Marcelo
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to