I'm having trouble loading a streaming job from a checkpoint when a
broadcast variable is defined. I've seen the solution by TD in Scala (
https://issues.apache.org/jira/browse/SPARK-5206) that uses a singleton to
get/create an accumulator, but I can't seem to get it to work in PySpark
with a broadcast variable.

A simplified code snippet:
broadcastHelper = {}

class StreamingJob(object):
    def transform_function(self):
        def transform_function_inner(t, rdd):
            if 'bar' not in broadcastHelper:
                broadcastHelper['bar'] =
rdd.context.broadcast(broadcastHelper['foo'])
            return rdd.filter(lambda event: event['id'] not in
broadcastHelper['bar'].value)
        return transform_function_inner

    def createContext(self):
        dstream = self.getKafkaStream()
        dstream = dstream.transform(self.transform_function())
        dstream.foreachRdd(lambda rdd:
rdd.foreachPartition(self.send_partition))

    def run(self):
        broadcastHelper['foo'] = {1, 2, 3}
        ssc = StreamingContext.getOrCreate(self.checkpoint_path,
self.createContext)
        ssc.start()
        ssc.awaitTermination()

The error I inevitably get when restoring from the checkpoint is:
Exception: (Exception("Broadcast variable '3' not loaded!",), <function
_from_id at 0x1089a1230>, (3L,))

Has anyone had any luck checkpointing in PySpark with a broadcast variable?

Reply via email to