On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin <van...@cloudera.com> wrote:

> You didn't show the error so the only thing we can do is speculate. You're
> probably sending the object that's holding the SparkContext reference over
> the network at some point (e.g. it's used by a task run in an executor),
> and that's why you were getting that exception.
>

Apologies - the full error is as follows.  All I did here was remove the
@transient annotation from the sc variable in my class constructor.  In
addition, the full code for the classes and launching process is included
below.

Error traceback:
```
Exception in thread "pool-5-thread-1" java.lang.Error:
org.apache.spark.SparkException: Task not serializable
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Task not serializable
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
        at org.apache.spark.rdd.RDD.map(RDD.scala:288)
        at
io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        ... 2 more
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
        at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
        ... 7 more
```

Code:
```
class SecondRollup(sc: SparkContext, connector: CassandraConnector,
scanPoint: DateTime) extends Runnable with Serializable {

  def run {
    val conn = sc.cassandraTable("alpha_test", "sensor_readings")
      .select("data")
      .where("timep = ?", scanPoint)
      .where("sensorid IN ?", System.sensors)
      .map(r => Json.parse(r.getString("data")))
      .cache()

    conn.flatMap(AffectedIp.fromJson)
      .map(a => (AffectedIp.key(a), a))
      .reduceByKey(AffectedIp.reduce)
      .map(_._2)
      .map(a => AffectedIp.reduceWithCurrent(connector, a))
      .saveToCassandra("alpha_test", "affected_hosts")

    conn.flatMap(ServiceSummary.fromnJson)
      .map(s => (ServiceSummary.key(s), s))
      .reduceByKey(ServiceSummary.reduce)
      .map(_._2)
      .saveToCassandra("alpha_test", "service_summary_rollup")

  }
}

object Transforms {
  private val appNameBase = "Transforms%s"
  private val dtFormatter = DateTimeFormat.forPattern("yyyyMMddHH")

  def main(args: Array[String]) {
    if (args.size < 2) {
      println("""Usage: ConnTransforms <start> <end>
        <start>     DateTime to start processing at. Format: yyyyMMddHH
        <end>       DateTime to end processing at.  Format: yyyyMMddHH""")
      sys.exit(1)
    }

    // withZoneRetainFields gives us a UTC time as specified on the command
line.
    val start =
dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC)
    val end =
dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC)

    println("Processing rollups from %s to %s".format(start, end))

    // Create the spark context.
    val conf = new SparkConf()
      .setAppName(appNameBase.format("Test"))

    val connector = CassandraConnector(conf)

    val sc = new SparkContext(conf)

    // Set up the threadpool for running Jobs.
    val pool = Executors.newFixedThreadPool(10)

    pool.execute(new SecondRollup(sc, connector, start))

    //for (dt <- new TimeRanger(start, end)) {
    //  // Always run the second rollups.
    //  pool.execute(new SecondRollup(sc, connector, dt))
    //  if (Granularity.Minute.isGranularity(dt)) pool.execute(new
MinuteRollup(sc, connector, dt))
    //}

    // stop the pool from accepting new tasks
    pool.shutdown()

    // We've submitted all the tasks.
    while (!pool.isTerminated()) pool.awaitTermination(10, TimeUnit.SECONDS)
  }
}
```

Reply via email to