Hi folks!
I'm trying to implement an update of a broadcast var in Spark Streaming.
The idea is that whenever some configuration value has changed (this is
periodically checked by the driver) the existing broadcast variable is
unpersisted and then (re-)broadcasted.
In a local test setup (using a local Spark) it works fine but on a real
cluster it doesn't work. The broadcast variable never gets updated. Am I
doing something wrong? Or is this simply not possible? Or a bug?
Code snippet:
@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
private final transient JavaSparkContext sparkContext;
@Getter
private transient volatile Broadcast<Map<String, String>> broadcastVar;
private transient Map<String, String> configMap;
public void run() {
Map<String, String> configMap = getConfigMap();
if (this.broadcastVar == null ||
!configMap.equals(this.configMap)) {
this.configMap = configMap;
if (broadcastVar != null) {
broadcastVar.unpersist(true);
broadcastVar.destroy(true);
}
this.broadcastVar =
this.sparkContext.broadcast(this.configMap);
}
}
private Map<String, String> getConfigMap() {
//impl details
}
}
public class StreamingFunction implements Serializable {
private transient volatile BroadcastUpdater broadcastUpdater;
protected JavaStreamingContext startStreaming(JavaStreamingContext
context, ConsumerStrategy<String, ChangeDataRecord> consumerStrategy) {
broadcastUpdater = new BroadcastUpdater(context.sparkContext());
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
3, TimeUnit.SECONDS);
final JavaInputDStream<ConsumerRecord<String,
ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
LocationStrategies.PreferConsistent(), consumerStrategy);
inputStream.foreachRDD(rdd -> {
Broadcast<Map<String, String>> broadcastVar =
broadcastUpdater.getBroadcastVar();
rdd.foreachPartition(partition -> {
if (partition.hasNext()) {
Map<String, String> configMap =
broadcastVar.getValue();
// iterate
while (partition.hasNext()) {
//impl logic using broadcast variable
}
}
}
}
}
}
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org