I'm having trouble loading a streaming job from a checkpoint when a broadcast variable is defined. I've seen the solution by TD in Scala ( https://issues.apache.org/jira/browse/SPARK-5206) that uses a singleton to get/create an accumulator, but I can't seem to get it to work in PySpark with a broadcast variable.
A simplified code snippet: broadcastHelper = {} class StreamingJob(object): def transform_function(self): def transform_function_inner(t, rdd): if 'bar' not in broadcastHelper: broadcastHelper['bar'] = rdd.context.broadcast(broadcastHelper['foo']) return rdd.filter(lambda event: event['id'] not in broadcastHelper['bar'].value) return transform_function_inner def createContext(self): dstream = self.getKafkaStream() dstream = dstream.transform(self.transform_function()) dstream.foreachRdd(lambda rdd: rdd.foreachPartition(self.send_partition)) def run(self): broadcastHelper['foo'] = {1, 2, 3} ssc = StreamingContext.getOrCreate(self.checkpoint_path, self.createContext) ssc.start() ssc.awaitTermination() The error I inevitably get when restoring from the checkpoint is: Exception: (Exception("Broadcast variable '3' not loaded!",), <function _from_id at 0x1089a1230>, (3L,)) Has anyone had any luck checkpointing in PySpark with a broadcast variable? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Checkpoints-with-Broadcast-Variables-tp24863.html Sent from the Apache Spark User List mailing list archive at Nabble.com.