Your lambda expressions on the RDDs in the SecondRollup class are closing around the context, and Spark has special logic to ensure that all variables in a closure used on an RDD are Serializable - I hate linking to Quora, but there's a good explanation here: http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark
On June 5, 2015, at 4:14 PM, Lee McFadden <[email protected]> wrote: On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin <[email protected]> wrote: You didn't show the error so the only thing we can do is speculate. You're probably sending the object that's holding the SparkContext reference over the network at some point (e.g. it's used by a task run in an executor), and that's why you were getting that exception. Apologies - the full error is as follows. All I did here was remove the @transient annotation from the sc variable in my class constructor. In addition, the full code for the classes and launching process is included below. Error traceback: ``` Exception in thread "pool-5-thread-1" java.lang.Error: org.apache.spark.SparkException: Task not serializable at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.map(RDD.scala:288) at io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ... 2 more Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 7 more ``` Code: ``` class SecondRollup(sc: SparkContext, connector: CassandraConnector, scanPoint: DateTime) extends Runnable with Serializable { def run { val conn = sc.cassandraTable("alpha_test", "sensor_readings") .select("data") .where("timep = ?", scanPoint) .where("sensorid IN ?", System.sensors) .map(r => Json.parse(r.getString("data"))) .cache() conn.flatMap(AffectedIp.fromJson) .map(a => (AffectedIp.key(a), a)) .reduceByKey(AffectedIp.reduce) .map(_._2) .map(a => AffectedIp.reduceWithCurrent(connector, a)) .saveToCassandra("alpha_test", "affected_hosts") conn.flatMap(ServiceSummary.fromnJson) .map(s => (ServiceSummary.key(s), s)) .reduceByKey(ServiceSummary.reduce) .map(_._2) .saveToCassandra("alpha_test", "service_summary_rollup") } } object Transforms { private val appNameBase = "Transforms%s" private val dtFormatter = DateTimeFormat.forPattern("yyyyMMddHH") def main(args: Array[String]) { if (args.size < 2) { println("""Usage: ConnTransforms <start> <end> <start> DateTime to start processing at. Format: yyyyMMddHH <end> DateTime to end processing at. Format: yyyyMMddHH""") sys.exit(1) } // withZoneRetainFields gives us a UTC time as specified on the command line. val start = dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC) val end = dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC) println("Processing rollups from %s to %s".format(start, end)) // Create the spark context. val conf = new SparkConf() .setAppName(appNameBase.format("Test")) val connector = CassandraConnector(conf) val sc = new SparkContext(conf) // Set up the threadpool for running Jobs. val pool = Executors.newFixedThreadPool(10) pool.execute(new SecondRollup(sc, connector, start)) //for (dt <- new TimeRanger(start, end)) { // // Always run the second rollups. // pool.execute(new SecondRollup(sc, connector, dt)) // if (Granularity.Minute.isGranularity(dt)) pool.execute(new MinuteRollup(sc, connector, dt)) //} // stop the pool from accepting new tasks pool.shutdown() // We've submitted all the tasks. while (!pool.isTerminated()) pool.awaitTermination(10, TimeUnit.SECONDS) } } ```
