The SparkContext is lost when I call the persist function from the sink
function, just before the function call... everything works as intended so
I guess is the FunctionN class serialisation what it's causing the problem.
I will try to embed the functionality in the sink method to verify that.

2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> The following functions,
>
> def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
> HLL))]): Unit = {
>     data.foreachRDD { rdd =>
>       rdd.cache()
>       val (minTime, maxTime): (Long, Long) =
>         rdd.map {
>           case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time)
>         }.fold((Long.MaxValue, Long.MinValue)) {
>           case ((min, max), (num, _)) => (math.min(min, num),
> math.max(max, num))
>         }
>       if (minTime != Long.MaxValue && maxTime != Long.MinValue) {
>         rdd.map(_._1).distinct().foreach {
>           case (game, category) => persist(game, category, minTime,
> maxTime, rdd)
>         }
>       }
>       rdd.unpersist(blocking = false)
>     }
>   }
>
>   def persist(game: GameID, category: Category, min: Long, max: Long,
> data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
> {
>     val family: String = s"${parameters.table.family}_$
> {game.repr}_${category.repr}"
>     val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
>       data.sparkContext.cassandraTable[(Long, Long, String,
> Array[Byte])](parameters.table.keyspace, family)
>     val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
>       cas
>         .where(""""time" >= ?""", new Date(min))
>         .where(""""time" <= ?""", new Date(max))
>         .map {
>           case (date, time, platform, array) => ((TimeSeriesKey(date,
> time), Platform(platform)), HyperLogLog.fromBytes(array))
>         }
>     data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
> {
>       case ((key, platform), (value, maybe)) =>
>         (key.date, key.time, platform.repr, 
> HyperLogLog.toBytes(maybe.fold(value)(array
> => value + array)))
>     }.saveToCassandra(parameters.table.keyspace, family)
>   }
>
> are causing this exception at runtime:
>
> 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID
> 126)
> java.lang.NullPointerException
>         at com.datastax.spark.connector.SparkContextFunctions.
> cassandraTable$default$3(SparkContextFunctions.scala:47)
>         at com.mindcandy.services.mako.concurrentusers.
> ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
>         at com.mindcandy.services.mako.concurrentusers.
> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
> ActiveUsersJobImpl.scala:41)
>         at com.mindcandy.services.mako.concurrentusers.
> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
> ActiveUsersJobImpl.scala:40)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
> scala:759)
>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
> scala:759)
>         at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
> SparkContext.scala:1143)
>         at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
> SparkContext.scala:1143)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:62)
>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>         at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:178)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
> SparkContextFunctions.scala is the implicit CassandraConnector that uses
> the underlying spark context to retrieve the SparkConf.
>
> After a few hours debugging the code, the source of the problem is that,
>
> data.sparkContext
>
> is returning null. It seems that the RDD is serialised and the
> SparkContext is lost. Is this the expected behaviour? Is a known bug?
>
> I have ran out of ideas on how to make this work so I'm open to
> suggestions.
>
> Kind regards,
>
> Luis
>

Reply via email to