On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin <van...@cloudera.com> wrote:
> You didn't show the error so the only thing we can do is speculate. You're > probably sending the object that's holding the SparkContext reference over > the network at some point (e.g. it's used by a task run in an executor), > and that's why you were getting that exception. > Apologies - the full error is as follows. All I did here was remove the @transient annotation from the sc variable in my class constructor. In addition, the full code for the classes and launching process is included below. Error traceback: ``` Exception in thread "pool-5-thread-1" java.lang.Error: org.apache.spark.SparkException: Task not serializable at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.map(RDD.scala:288) at io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ... 2 more Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 7 more ``` Code: ``` class SecondRollup(sc: SparkContext, connector: CassandraConnector, scanPoint: DateTime) extends Runnable with Serializable { def run { val conn = sc.cassandraTable("alpha_test", "sensor_readings") .select("data") .where("timep = ?", scanPoint) .where("sensorid IN ?", System.sensors) .map(r => Json.parse(r.getString("data"))) .cache() conn.flatMap(AffectedIp.fromJson) .map(a => (AffectedIp.key(a), a)) .reduceByKey(AffectedIp.reduce) .map(_._2) .map(a => AffectedIp.reduceWithCurrent(connector, a)) .saveToCassandra("alpha_test", "affected_hosts") conn.flatMap(ServiceSummary.fromnJson) .map(s => (ServiceSummary.key(s), s)) .reduceByKey(ServiceSummary.reduce) .map(_._2) .saveToCassandra("alpha_test", "service_summary_rollup") } } object Transforms { private val appNameBase = "Transforms%s" private val dtFormatter = DateTimeFormat.forPattern("yyyyMMddHH") def main(args: Array[String]) { if (args.size < 2) { println("""Usage: ConnTransforms <start> <end> <start> DateTime to start processing at. Format: yyyyMMddHH <end> DateTime to end processing at. Format: yyyyMMddHH""") sys.exit(1) } // withZoneRetainFields gives us a UTC time as specified on the command line. val start = dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC) val end = dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC) println("Processing rollups from %s to %s".format(start, end)) // Create the spark context. val conf = new SparkConf() .setAppName(appNameBase.format("Test")) val connector = CassandraConnector(conf) val sc = new SparkContext(conf) // Set up the threadpool for running Jobs. val pool = Executors.newFixedThreadPool(10) pool.execute(new SecondRollup(sc, connector, start)) //for (dt <- new TimeRanger(start, end)) { // // Always run the second rollups. // pool.execute(new SecondRollup(sc, connector, dt)) // if (Granularity.Minute.isGranularity(dt)) pool.execute(new MinuteRollup(sc, connector, dt)) //} // stop the pool from accepting new tasks pool.shutdown() // We've submitted all the tasks. while (!pool.isTerminated()) pool.awaitTermination(10, TimeUnit.SECONDS) } } ```