Hello everyone, this is my first message ever to a mailing list so please pardon me if for some reason I'm violating the etiquette.
I have a problem with rebroadcasting a variable. How it should work is not well documented so I could find only a few and simple example to understand how it should work. What I'm trying to do is to propagate an update to the option for the behaviour of my streaming transformations (in this case, the evaluation of machine learning models). I have a listener on a kafka queue that wait for messages and update the broadcasted variable. I made it to work but the system doesn't rebroadcast anything if I pass the DStream or the broadcasted variable as a parameter. So they must be defined both in the same scope and the rebroadcasting should happen again in the same scope. Right now my main function looks like this: ---------------------------------------------------------------------------------------------------------------------- var updateVar= sc.broadcast("test") val stream=input.map(x => myTransformation(x,updateVar)) stream.writeToKafka[String, String](outputProps, (m: String) => new KeyedMessage[String, String](configuration.outputTopic, m +updateVar.value )) val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new StringDecoder())(0) for (messageAndTopic <- controlStream) { println("ricevo") updateVar.unpersist() updateVar=ssc.sparkContext.broadcast(messageAndTopic.message) } ssc.start() ssc.awaitTermination() ---------------------------------------------------------------------------------------------------------------------- "updateVar" is correctly updated both in "myTransformation" and in the main scope and I can access the updated value. But when I try to do this moving the logic to a class, it fails. I have something like this (or the same queue listener from before, but moved to another class): class Listener(var updateVar: Broadcast[String]){... def someFunc()={ updateVar.unpersist() updateVar=sc.broadcast("new value") } ... } This fails: the variable can be destroyed but cannot be updated. Any suggestion on why there is this behaviour? Also I would like to know how Spark notices the reassignment to var and start the rebroadcasting. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org