I think you're taking the right approach, trying to create a new broadcast
var. What part doesn't work? for example I wonder if comparing Map equality
like that does what you think, isn't it just reference equality? debug a
bit more to see whether it even destroys and recreates the broadcast in
your code.

On Fri, Jul 22, 2022 at 4:24 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi folks!
>
> I'm trying to implement an update of a broadcast var in Spark Streaming.
> The idea is that whenever some configuration value has changed (this is
> periodically checked by the driver) the existing broadcast variable is
> unpersisted and then (re-)broadcasted.
>
> In a local test setup (using a local Spark) it works fine but on a real
> cluster it doesn't work. The broadcast variable never gets updated. Am I
> doing something wrong? Or is this simply not possible? Or a bug?
>
> Code snippet:
>
> @RequiredArgsConstructor
> public class BroadcastUpdater implements Runnable {
>      private final transient JavaSparkContext sparkContext;
>      @Getter
>      private transient volatile Broadcast<Map<String, String>>
> broadcastVar;
>      private transient Map<String, String> configMap;
>
>      public void run() {
>          Map<String, String> configMap = getConfigMap();
>          if (this.broadcastVar == null ||
> !configMap.equals(this.configMap)) {
>              this.configMap = configMap;
>              if (broadcastVar != null) {
>                  broadcastVar.unpersist(true);
>                  broadcastVar.destroy(true);
>              }
>              this.broadcastVar =
> this.sparkContext.broadcast(this.configMap);
>          }
>      }
>
>      private Map<String, String> getConfigMap() {
>          //impl details
>      }
> }
>
> public class StreamingFunction implements Serializable {
>
>      private transient volatile BroadcastUpdater broadcastUpdater;
>
>      protected JavaStreamingContext startStreaming(JavaStreamingContext
> context, ConsumerStrategy<String, ChangeDataRecord> consumerStrategy) {
>          broadcastUpdater = new BroadcastUpdater(context.sparkContext());
>          ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
> ScheduledThreadPoolExecutor(1);
> scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
> 3, TimeUnit.SECONDS);
>
>          final JavaInputDStream<ConsumerRecord<String,
> ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
>                  LocationStrategies.PreferConsistent(), consumerStrategy);
>
>          inputStream.foreachRDD(rdd -> {
>              Broadcast<Map<String, String>> broadcastVar =
> broadcastUpdater.getBroadcastVar();
>              rdd.foreachPartition(partition -> {
>                  if (partition.hasNext()) {
>                      Map<String, String> configMap =
> broadcastVar.getValue();
>
>                      // iterate
>                      while (partition.hasNext()) {
>                          //impl logic using broadcast variable
>                      }
>                  }
>              }
>          }
>      }
> }
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to