New to Apache Spark, trying to build a scalatest. Below is the error I'm
consistently seeing. Somehow Spark is trying to load a scalatest
AssertionHelper class which is not serializable. The scalatest I have
specified doesn't even have any assertions in it. I added the JVM flag 

-Dsun.io.serialization.extendedDebugInfo=true

to get more detailed output, which is included.

Versions: Using scala 2.10.4, scalatest 2.2.2, spark 1.0.2.

Here's the basic test code.


import java.sql
import java.sql.Connection

import com.marchex.msa.bigjoin.data.{Keyword, Keywords}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.scalatest._


import org.scalatest.FunSuite

/**
* Created by cvjones on 8/20/14.
*/



class KeywordsSpec extends FunSuite  {

  implicit def connectionInfo(): Connection = {
    Class.forName("org.postgresql.Driver")
  
sql.DriverManager.getConnection("jdbc:postgresql://localhost/warehouse?user=dev&password=Password23")
  }

def keywordRows (implicit sc: SparkContext, connectionFun: ()=>Connection)
= {

  println("Getting keyword rows for context "+sc+" connection
"+connectionFun)
  val ROW_START = 0
  val ROW_END = 1000
  val NUM_PARTITIONS = 10

  // Query pulls in the phone_number_id, and adgroup_id if it exists
(otherwise adgroup_id is NONE)
  val rdd = new org.apache.spark.rdd.JdbcRDD(
    sc,
    connectionFun,
    "SELECT k.keyword_id,k.keyword,k.match_type FROM
fortknox_sandbox.v_keyword k " +
      " offset ? limit ?",
    ROW_START, ROW_END, NUM_PARTITIONS,
    row => Keyword(
      row.getLong("keyword_id"),
      row.getString("keyword"),
      row.getString("match_type")
    )
  )
  rdd
}

  def keywordsByKeywordId (implicit sc: SparkContext, connectionFun:
()=>Connection) = {

    val keyword = keywordRows.map{
      keywordRecord => (keywordRecord.keyword_id,keywordRecord)
    }
    keyword
  }

  test("The keywords have more than zero rows ") {

      implicit val sc = new SparkContext("local","test")

      println("\n******Running simple test")
      val k = keywordsByKeywordId.collect
    }
}

Here's the error.

Job aborted due to stage failure: Task not serializable:
java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
    - field (class "org.scalatest.FunSuite", name: "assertionsHelper", type:
"class org.scalatest.Assertions$AssertionsHelper")
    - object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec)
    - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name:
"$outer", type: "class com.marchex.msa.tests.KeywordsSpec")
    - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1",
<function0>)
    - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
name: "$outer", type: "class com.marchex.msa.tests.KeywordsSpec$$anonfun$1")
    - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
<function0>)
    - field (class "org.apache.spark.rdd.JdbcRDD", name:
"org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface
scala.Function0")
    - object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at JdbcRDD at
KeywordsSpec.scala:36)
    - field (class "org.apache.spark.Dependency", name: "rdd", type: "class
org.apache.spark.rdd.RDD")
    - object (class "org.apache.spark.OneToOneDependency",
org.apache.spark.OneToOneDependency@2fe36ebc)
    - custom writeObject data (class "scala.collection.immutable.$colon$colon")
    - object (class "scala.collection.immutable.$colon$colon",
List(org.apache.spark.OneToOneDependency@2fe36ebc))
    - field (class "org.apache.spark.rdd.RDD", name:
"org$apache$spark$rdd$RDD$$dependencies_", type: "interface
scala.collection.Seq")
    - root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1] at map
at KeywordsSpec.scala:53)
org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
org.scalatest.Assertions$AssertionsHelper
    - field (class "org.scalatest.FunSuite", name: "assertionsHelper", type:
"class org.scalatest.Assertions$AssertionsHelper")
    - object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec)
    - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name:
"$outer", type: "class com.marchex.msa.tests.KeywordsSpec")
    - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1",
<function0>)
    - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
name: "$outer", type: "class com.marchex.msa.tests.KeywordsSpec$$anonfun$1")
    - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
<function0>)
    - field (class "org.apache.spark.rdd.JdbcRDD", name:
"org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface
scala.Function0")
    - object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at JdbcRDD at
KeywordsSpec.scala:36)
    - field (class "org.apache.spark.Dependency", name: "rdd", type: "class
org.apache.spark.rdd.RDD")
    - object (class "org.apache.spark.OneToOneDependency",
org.apache.spark.OneToOneDependency@2fe36ebc)
    - custom writeObject data (class "scala.collection.immutable.$colon$colon")
    - object (class "scala.collection.immutable.$colon$colon",
List(org.apache.spark.OneToOneDependency@2fe36ebc))
    - field (class "org.apache.spark.rdd.RDD", name:
"org$apache$spark$rdd$RDD$$dependencies_", type: "interface
scala.collection.Seq")
    - root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1] at map
at KeywordsSpec.scala:53)
    at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
    at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772)
    at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715)
    at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699)
    at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reply via email to