Re: Updating Broadcast Variable in Spark Streaming 2.4.4

2022-09-28 Thread Sean Owen
I don't think that can work. Your BroadcastUpdater is copied to the task,
with a reference to an initial broadcast. When that is later updated on the
driver, this does not affect the broadcast inside the copy in the tasks.

On Wed, Sep 28, 2022 at 10:11 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi folks!
>
>
> I'm trying to implement an update of a broadcast var in Spark Streaming.
> The idea is that whenever some configuration value has changed (this is
> periodically checked by the driver) the existing broadcast variable is
> unpersisted and then (re-)broadcasted.
>
> In a local test setup (using a local Spark) it works fine but on a real
> cluster it doesn't work. The broadcast variable never gets updated. What
> I can see after adding some log messages is that the BroadcastUpdater
> thread is only called twice and then never again. Anyone any idea why
> this happens?
>
> Code snippet:
>
> @RequiredArgsConstructor
> public class BroadcastUpdater implements Runnable {
>  private final transient JavaSparkContext sparkContext;
>  @Getter
>  private transient volatile Broadcast>
> broadcastVar;
>  private transient volatile Map configMap;
>
>  public void run() {
>  Map configMap = getConfigMap();
>  if (this.broadcastVar == null ||
> !configMap.equals(this.configMap)) {
>  this.configMap = configMap;
>  if (broadcastVar != null) {
>  broadcastVar.unpersist(true);
>  broadcastVar.destroy(true);
>  }
>  this.broadcastVar =
> this.sparkContext.broadcast(this.configMap);
>  }
>  }
>
>  private Map getConfigMap() {
>  //impl details
>  }
> }
>
> public class StreamingFunction implements Serializable {
>
>  private transient volatile BroadcastUpdater broadcastUpdater;
>  private transient ScheduledThreadPoolExecutor
> scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
>
>  protected JavaStreamingContext startStreaming(JavaStreamingContext
> context, ConsumerStrategy consumerStrategy) {
>  broadcastUpdater = new BroadcastUpdater(context.sparkContext());
> scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
> 3, TimeUnit.SECONDS);
>
>  final JavaInputDStream ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
>  LocationStrategies.PreferConsistent(), consumerStrategy);
>
>  inputStream.foreachRDD(rdd -> {
>  Broadcast> broadcastVar =
> broadcastUpdater.getBroadcastVar();
>  rdd.foreachPartition(partition -> {
>  if (partition.hasNext()) {
>  Map configMap =
> broadcastVar.getValue();
>
>  // iterate
>  while (partition.hasNext()) {
>  //impl logic using broadcast variable
>  }
>  }
>  }
>  }
>  }
> }
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Updating Broadcast Variable in Spark Streaming 2.4.4

2022-09-28 Thread Dipl.-Inf. Rico Bergmann

Hi folks!


I'm trying to implement an update of a broadcast var in Spark Streaming. 
The idea is that whenever some configuration value has changed (this is 
periodically checked by the driver) the existing broadcast variable is 
unpersisted and then (re-)broadcasted.


In a local test setup (using a local Spark) it works fine but on a real 
cluster it doesn't work. The broadcast variable never gets updated. What 
I can see after adding some log messages is that the BroadcastUpdater 
thread is only called twice and then never again. Anyone any idea why 
this happens?


Code snippet:

@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
    private final transient JavaSparkContext sparkContext;
    @Getter
    private transient volatile Broadcast> broadcastVar;
    private transient volatile Map configMap;

    public void run() {
    Map configMap = getConfigMap();
    if (this.broadcastVar == null || 
!configMap.equals(this.configMap)) {

        this.configMap = configMap;
        if (broadcastVar != null) {
            broadcastVar.unpersist(true);
            broadcastVar.destroy(true);
        }
        this.broadcastVar = 
this.sparkContext.broadcast(this.configMap);

    }
    }

    private Map getConfigMap() {
    //impl details
    }
}

public class StreamingFunction implements Serializable {

    private transient volatile BroadcastUpdater broadcastUpdater;
    private transient ScheduledThreadPoolExecutor 
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);


    protected JavaStreamingContext startStreaming(JavaStreamingContext 
context, ConsumerStrategy consumerStrategy) {

    broadcastUpdater = new BroadcastUpdater(context.sparkContext());
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0, 
3, TimeUnit.SECONDS);


    final JavaInputDStreamChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,

            LocationStrategies.PreferConsistent(), consumerStrategy);

    inputStream.foreachRDD(rdd -> {
        Broadcast> broadcastVar = 
broadcastUpdater.getBroadcastVar();

        rdd.foreachPartition(partition -> {
            if (partition.hasNext()) {
                Map configMap = 
broadcastVar.getValue();


                // iterate
                while (partition.hasNext()) {
                    //impl logic using broadcast variable
                }
            }
        }
    }
    }
}



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Updating Broadcast Variable in Spark Streaming 2.4.4

2022-07-22 Thread Sean Owen
I think you're taking the right approach, trying to create a new broadcast
var. What part doesn't work? for example I wonder if comparing Map equality
like that does what you think, isn't it just reference equality? debug a
bit more to see whether it even destroys and recreates the broadcast in
your code.

On Fri, Jul 22, 2022 at 4:24 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi folks!
>
> I'm trying to implement an update of a broadcast var in Spark Streaming.
> The idea is that whenever some configuration value has changed (this is
> periodically checked by the driver) the existing broadcast variable is
> unpersisted and then (re-)broadcasted.
>
> In a local test setup (using a local Spark) it works fine but on a real
> cluster it doesn't work. The broadcast variable never gets updated. Am I
> doing something wrong? Or is this simply not possible? Or a bug?
>
> Code snippet:
>
> @RequiredArgsConstructor
> public class BroadcastUpdater implements Runnable {
>  private final transient JavaSparkContext sparkContext;
>  @Getter
>  private transient volatile Broadcast>
> broadcastVar;
>  private transient Map configMap;
>
>  public void run() {
>  Map configMap = getConfigMap();
>  if (this.broadcastVar == null ||
> !configMap.equals(this.configMap)) {
>  this.configMap = configMap;
>  if (broadcastVar != null) {
>  broadcastVar.unpersist(true);
>  broadcastVar.destroy(true);
>  }
>  this.broadcastVar =
> this.sparkContext.broadcast(this.configMap);
>  }
>  }
>
>  private Map getConfigMap() {
>  //impl details
>  }
> }
>
> public class StreamingFunction implements Serializable {
>
>  private transient volatile BroadcastUpdater broadcastUpdater;
>
>  protected JavaStreamingContext startStreaming(JavaStreamingContext
> context, ConsumerStrategy consumerStrategy) {
>  broadcastUpdater = new BroadcastUpdater(context.sparkContext());
>  ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
> ScheduledThreadPoolExecutor(1);
> scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
> 3, TimeUnit.SECONDS);
>
>  final JavaInputDStream ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
>  LocationStrategies.PreferConsistent(), consumerStrategy);
>
>  inputStream.foreachRDD(rdd -> {
>  Broadcast> broadcastVar =
> broadcastUpdater.getBroadcastVar();
>  rdd.foreachPartition(partition -> {
>  if (partition.hasNext()) {
>  Map configMap =
> broadcastVar.getValue();
>
>  // iterate
>  while (partition.hasNext()) {
>  //impl logic using broadcast variable
>  }
>  }
>  }
>  }
>  }
> }
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Updating Broadcast Variable in Spark Streaming 2.4.4

2022-07-22 Thread Dipl.-Inf. Rico Bergmann

Hi folks!

I'm trying to implement an update of a broadcast var in Spark Streaming. 
The idea is that whenever some configuration value has changed (this is 
periodically checked by the driver) the existing broadcast variable is 
unpersisted and then (re-)broadcasted.


In a local test setup (using a local Spark) it works fine but on a real 
cluster it doesn't work. The broadcast variable never gets updated. Am I 
doing something wrong? Or is this simply not possible? Or a bug?


Code snippet:

@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
    private final transient JavaSparkContext sparkContext;
    @Getter
    private transient volatile Broadcast> broadcastVar;
    private transient Map configMap;

    public void run() {
    Map configMap = getConfigMap();
    if (this.broadcastVar == null || 
!configMap.equals(this.configMap)) {

        this.configMap = configMap;
        if (broadcastVar != null) {
            broadcastVar.unpersist(true);
            broadcastVar.destroy(true);
        }
        this.broadcastVar = 
this.sparkContext.broadcast(this.configMap);

    }
    }

    private Map getConfigMap() {
    //impl details
    }
}

public class StreamingFunction implements Serializable {

    private transient volatile BroadcastUpdater broadcastUpdater;

    protected JavaStreamingContext startStreaming(JavaStreamingContext 
context, ConsumerStrategy consumerStrategy) {

    broadcastUpdater = new BroadcastUpdater(context.sparkContext());
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new 
ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0, 
3, TimeUnit.SECONDS);


    final JavaInputDStreamChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,

            LocationStrategies.PreferConsistent(), consumerStrategy);

    inputStream.foreachRDD(rdd -> {
        Broadcast> broadcastVar = 
broadcastUpdater.getBroadcastVar();

        rdd.foreachPartition(partition -> {
            if (partition.hasNext()) {
                Map configMap = 
broadcastVar.getValue();


                // iterate
                while (partition.hasNext()) {
                    //impl logic using broadcast variable
                }
            }
        }
    }
    }
}

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org