On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin <[email protected]> wrote:
> You didn't show the error so the only thing we can do is speculate. You're
> probably sending the object that's holding the SparkContext reference over
> the network at some point (e.g. it's used by a task run in an executor),
> and that's why you were getting that exception.
>
Apologies - the full error is as follows. All I did here was remove the
@transient annotation from the sc variable in my class constructor. In
addition, the full code for the classes and launching process is included
below.
Error traceback:
```
Exception in thread "pool-5-thread-1" java.lang.Error:
org.apache.spark.SparkException: Task not serializable
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.map(RDD.scala:288)
at
io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
... 2 more
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more
```
Code:
```
class SecondRollup(sc: SparkContext, connector: CassandraConnector,
scanPoint: DateTime) extends Runnable with Serializable {
def run {
val conn = sc.cassandraTable("alpha_test", "sensor_readings")
.select("data")
.where("timep = ?", scanPoint)
.where("sensorid IN ?", System.sensors)
.map(r => Json.parse(r.getString("data")))
.cache()
conn.flatMap(AffectedIp.fromJson)
.map(a => (AffectedIp.key(a), a))
.reduceByKey(AffectedIp.reduce)
.map(_._2)
.map(a => AffectedIp.reduceWithCurrent(connector, a))
.saveToCassandra("alpha_test", "affected_hosts")
conn.flatMap(ServiceSummary.fromnJson)
.map(s => (ServiceSummary.key(s), s))
.reduceByKey(ServiceSummary.reduce)
.map(_._2)
.saveToCassandra("alpha_test", "service_summary_rollup")
}
}
object Transforms {
private val appNameBase = "Transforms%s"
private val dtFormatter = DateTimeFormat.forPattern("yyyyMMddHH")
def main(args: Array[String]) {
if (args.size < 2) {
println("""Usage: ConnTransforms <start> <end>
<start> DateTime to start processing at. Format: yyyyMMddHH
<end> DateTime to end processing at. Format: yyyyMMddHH""")
sys.exit(1)
}
// withZoneRetainFields gives us a UTC time as specified on the command
line.
val start =
dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC)
val end =
dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC)
println("Processing rollups from %s to %s".format(start, end))
// Create the spark context.
val conf = new SparkConf()
.setAppName(appNameBase.format("Test"))
val connector = CassandraConnector(conf)
val sc = new SparkContext(conf)
// Set up the threadpool for running Jobs.
val pool = Executors.newFixedThreadPool(10)
pool.execute(new SecondRollup(sc, connector, start))
//for (dt <- new TimeRanger(start, end)) {
// // Always run the second rollups.
// pool.execute(new SecondRollup(sc, connector, dt))
// if (Granularity.Minute.isGranularity(dt)) pool.execute(new
MinuteRollup(sc, connector, dt))
//}
// stop the pool from accepting new tasks
pool.shutdown()
// We've submitted all the tasks.
while (!pool.isTerminated()) pool.awaitTermination(10, TimeUnit.SECONDS)
}
}
```