The SparkContext is lost when I call the persist function from the sink function, just before the function call... everything works as intended so I guess is the FunctionN class serialisation what it's causing the problem. I will try to embed the functionality in the sink method to verify that.
2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez < langel.gro...@gmail.com>: > The following functions, > > def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform), > HLL))]): Unit = { > data.foreachRDD { rdd => > rdd.cache() > val (minTime, maxTime): (Long, Long) = > rdd.map { > case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time) > }.fold((Long.MaxValue, Long.MinValue)) { > case ((min, max), (num, _)) => (math.min(min, num), > math.max(max, num)) > } > if (minTime != Long.MaxValue && maxTime != Long.MinValue) { > rdd.map(_._1).distinct().foreach { > case (game, category) => persist(game, category, minTime, > maxTime, rdd) > } > } > rdd.unpersist(blocking = false) > } > } > > def persist(game: GameID, category: Category, min: Long, max: Long, > data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = > { > val family: String = s"${parameters.table.family}_$ > {game.repr}_${category.repr}" > val cas: CassandraRDD[(Long, Long, String, Array[Byte])] = > data.sparkContext.cassandraTable[(Long, Long, String, > Array[Byte])](parameters.table.keyspace, family) > val fil: RDD[((TimeSeriesKey, Platform), HLL)] = > cas > .where(""""time" >= ?""", new Date(min)) > .where(""""time" <= ?""", new Date(max)) > .map { > case (date, time, platform, array) => ((TimeSeriesKey(date, > time), Platform(platform)), HyperLogLog.fromBytes(array)) > } > data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map > { > case ((key, platform), (value, maybe)) => > (key.date, key.time, platform.repr, > HyperLogLog.toBytes(maybe.fold(value)(array > => value + array))) > }.saveToCassandra(parameters.table.keyspace, family) > } > > are causing this exception at runtime: > > 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID > 126) > java.lang.NullPointerException > at com.datastax.spark.connector.SparkContextFunctions. > cassandraTable$default$3(SparkContextFunctions.scala:47) > at com.mindcandy.services.mako.concurrentusers. > ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51) > at com.mindcandy.services.mako.concurrentusers. > ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( > ActiveUsersJobImpl.scala:41) > at com.mindcandy.services.mako.concurrentusers. > ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( > ActiveUsersJobImpl.scala:40) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. > scala:759) > at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. > scala:759) > at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( > SparkContext.scala:1143) > at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( > SparkContext.scala:1143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. > scala:62) > at org.apache.spark.scheduler.Task.run(Task.scala:54) > at org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:178) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1145) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of > SparkContextFunctions.scala is the implicit CassandraConnector that uses > the underlying spark context to retrieve the SparkConf. > > After a few hours debugging the code, the source of the problem is that, > > data.sparkContext > > is returning null. It seems that the RDD is serialised and the > SparkContext is lost. Is this the expected behaviour? Is a known bug? > > I have ran out of ideas on how to make this work so I'm open to > suggestions. > > Kind regards, > > Luis >