Hi folks!

I'm trying to implement an update of a broadcast var in Spark Streaming. The idea is that whenever some configuration value has changed (this is periodically checked by the driver) the existing broadcast variable is unpersisted and then (re-)broadcasted.

In a local test setup (using a local Spark) it works fine but on a real cluster it doesn't work. The broadcast variable never gets updated. What I can see after adding some log messages is that the BroadcastUpdater thread is only called twice and then never again. Anyone any idea why this happens?

Code snippet:

@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
    private final transient JavaSparkContext sparkContext;
    @Getter
    private transient volatile Broadcast<Map<String, String>> broadcastVar;
    private transient volatile Map<String, String> configMap;

    public void run() {
        Map<String, String> configMap = getConfigMap();
        if (this.broadcastVar == null || !configMap.equals(this.configMap)) {
            this.configMap = configMap;
            if (broadcastVar != null) {
                broadcastVar.unpersist(true);
                broadcastVar.destroy(true);
            }
            this.broadcastVar = this.sparkContext.broadcast(this.configMap);
        }
    }

    private Map<String, String> getConfigMap() {
        //impl details
    }
}

public class StreamingFunction implements Serializable {

    private transient volatile BroadcastUpdater broadcastUpdater;
    private transient ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);

    protected JavaStreamingContext startStreaming(JavaStreamingContext context, ConsumerStrategy<String, ChangeDataRecord> consumerStrategy) {
        broadcastUpdater = new BroadcastUpdater(context.sparkContext());
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0, 3, TimeUnit.SECONDS);

        final JavaInputDStream<ConsumerRecord<String, ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
                LocationStrategies.PreferConsistent(), consumerStrategy);

        inputStream.foreachRDD(rdd -> {
            Broadcast<Map<String, String>> broadcastVar = broadcastUpdater.getBroadcastVar();
            rdd.foreachPartition(partition -> {
                if (partition.hasNext()) {
                    Map<String, String> configMap = broadcastVar.getValue();

                    // iterate
                    while (partition.hasNext()) {
                        //impl logic using broadcast variable
                    }
                }
            }
        }
    }
}



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to