Your lambda expressions on the RDDs in the SecondRollup class are closing 
around the context, and Spark has special logic to ensure that all variables in 
a closure used on an RDD are Serializable - I hate linking to Quora, but 
there's a good explanation here: 
http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


On June 5, 2015, at 4:14 PM, Lee McFadden <splee...@gmail.com> wrote:



On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin <van...@cloudera.com> wrote:

You didn't show the error so the only thing we can do is speculate. You're 
probably sending the object that's holding the SparkContext reference over  the 
network at some point (e.g. it's used by a task run in an executor), and that's 
why you were getting that exception.


Apologies - the full error is as follows.  All I did here was remove the 
@transient annotation from the sc variable in my class constructor.  In 
addition, the full code for the classes and launching process is included below.


Error traceback:

```

Exception in thread "pool-5-thread-1" java.lang.Error: 
org.apache.spark.SparkException: Task not serializable

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Task not serializable

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

        at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)

        at org.apache.spark.rdd.RDD.map(RDD.scala:288)

        at io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        ... 2 more

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)

        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

        ... 7 more 

```


Code:

```

class SecondRollup(sc: SparkContext, connector: CassandraConnector, scanPoint: 
DateTime) extends Runnable with Serializable {


  def run {

    val conn = sc.cassandraTable("alpha_test", "sensor_readings")

      .select("data")

      .where("timep = ?", scanPoint)

      .where("sensorid IN ?", System.sensors)

      .map(r => Json.parse(r.getString("data")))

      .cache()


    conn.flatMap(AffectedIp.fromJson)

      .map(a => (AffectedIp.key(a), a))

      .reduceByKey(AffectedIp.reduce)

      .map(_._2)

      .map(a => AffectedIp.reduceWithCurrent(connector, a))

      .saveToCassandra("alpha_test", "affected_hosts")


    conn.flatMap(ServiceSummary.fromnJson)

      .map(s => (ServiceSummary.key(s), s))

      .reduceByKey(ServiceSummary.reduce)

      .map(_._2)

      .saveToCassandra("alpha_test", "service_summary_rollup")


  }

}


object Transforms {

  private val appNameBase = "Transforms%s"

  private val dtFormatter = DateTimeFormat.forPattern("yyyyMMddHH")


  def main(args: Array[String]) {

    if (args.size < 2) {

      println("""Usage: ConnTransforms <start> <end>

        <start>     DateTime to start processing at. Format: yyyyMMddHH

        <end>       DateTime to end processing at.  Format: yyyyMMddHH""")

      sys.exit(1)

    }


    // withZoneRetainFields gives us a UTC time as specified on the command 
line.

    val start = 
dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC)

    val end = 
dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC)


    println("Processing rollups from %s to %s".format(start, end))


    // Create the spark context.

    val conf = new SparkConf()

      .setAppName(appNameBase.format("Test"))


    val connector = CassandraConnector(conf)


    val sc = new SparkContext(conf)


    // Set up the threadpool for running Jobs.

    val pool = Executors.newFixedThreadPool(10)


    pool.execute(new SecondRollup(sc, connector, start))

    

    //for (dt <- new TimeRanger(start, end)) {

    //  // Always run the second rollups.

    //  pool.execute(new SecondRollup(sc, connector, dt))

    //  if (Granularity.Minute.isGranularity(dt)) pool.execute(new 
MinuteRollup(sc, connector, dt))

    //}


    // stop the pool from accepting new tasks

    pool.shutdown()


    // We've submitted all the tasks.

    while (!pool.isTerminated()) pool.awaitTermination(10, TimeUnit.SECONDS)

  }

}

```

Reply via email to