I think you're taking the right approach, trying to create a new broadcast var. What part doesn't work? for example I wonder if comparing Map equality like that does what you think, isn't it just reference equality? debug a bit more to see whether it even destroys and recreates the broadcast in your code.
On Fri, Jul 22, 2022 at 4:24 AM Dipl.-Inf. Rico Bergmann < i...@ricobergmann.de> wrote: > Hi folks! > > I'm trying to implement an update of a broadcast var in Spark Streaming. > The idea is that whenever some configuration value has changed (this is > periodically checked by the driver) the existing broadcast variable is > unpersisted and then (re-)broadcasted. > > In a local test setup (using a local Spark) it works fine but on a real > cluster it doesn't work. The broadcast variable never gets updated. Am I > doing something wrong? Or is this simply not possible? Or a bug? > > Code snippet: > > @RequiredArgsConstructor > public class BroadcastUpdater implements Runnable { > private final transient JavaSparkContext sparkContext; > @Getter > private transient volatile Broadcast<Map<String, String>> > broadcastVar; > private transient Map<String, String> configMap; > > public void run() { > Map<String, String> configMap = getConfigMap(); > if (this.broadcastVar == null || > !configMap.equals(this.configMap)) { > this.configMap = configMap; > if (broadcastVar != null) { > broadcastVar.unpersist(true); > broadcastVar.destroy(true); > } > this.broadcastVar = > this.sparkContext.broadcast(this.configMap); > } > } > > private Map<String, String> getConfigMap() { > //impl details > } > } > > public class StreamingFunction implements Serializable { > > private transient volatile BroadcastUpdater broadcastUpdater; > > protected JavaStreamingContext startStreaming(JavaStreamingContext > context, ConsumerStrategy<String, ChangeDataRecord> consumerStrategy) { > broadcastUpdater = new BroadcastUpdater(context.sparkContext()); > ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new > ScheduledThreadPoolExecutor(1); > scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0, > 3, TimeUnit.SECONDS); > > final JavaInputDStream<ConsumerRecord<String, > ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context, > LocationStrategies.PreferConsistent(), consumerStrategy); > > inputStream.foreachRDD(rdd -> { > Broadcast<Map<String, String>> broadcastVar = > broadcastUpdater.getBroadcastVar(); > rdd.foreachPartition(partition -> { > if (partition.hasNext()) { > Map<String, String> configMap = > broadcastVar.getValue(); > > // iterate > while (partition.hasNext()) { > //impl logic using broadcast variable > } > } > } > } > } > } > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >