Simone, here are some thoughts. Please check out the "understanding closures" section of the Spark Programming Guide. Secondly, broadcast variables do not propagate updates to the underlying data. You must either create a new broadcast variable or alternately if you simply wish to accumulate results you can use an Accumulator that stores an array or queue as a buffer that you then read from to Kafka.
You should also be able to send the results to a new DStream instead, and link that DStream to Kafka. Hope this gives you some ideas to play with. Thanks! Thank you, Ilya Ganelin -----Original Message----- From: simone.robutti [simone.robu...@gmail.com<mailto:simone.robu...@gmail.com>] Sent: Friday, August 07, 2015 10:07 AM Eastern Standard Time To: user@spark.apache.org Subject: Issue when rebroadcasting a variable outside of the definition scope Hello everyone, this is my first message ever to a mailing list so please pardon me if for some reason I'm violating the etiquette. I have a problem with rebroadcasting a variable. How it should work is not well documented so I could find only a few and simple example to understand how it should work. What I'm trying to do is to propagate an update to the option for the behaviour of my streaming transformations (in this case, the evaluation of machine learning models). I have a listener on a kafka queue that wait for messages and update the broadcasted variable. I made it to work but the system doesn't rebroadcast anything if I pass the DStream or the broadcasted variable as a parameter. So they must be defined both in the same scope and the rebroadcasting should happen again in the same scope. Right now my main function looks like this: ---------------------------------------------------------------------------------------------------------------------- var updateVar= sc.broadcast("test") val stream=input.map(x => myTransformation(x,updateVar)) stream.writeToKafka[String, String](outputProps, (m: String) => new KeyedMessage[String, String](configuration.outputTopic, m +updateVar.value )) val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new StringDecoder())(0) for (messageAndTopic <- controlStream) { println("ricevo") updateVar.unpersist() updateVar=ssc.sparkContext.broadcast(messageAndTopic.message) } ssc.start() ssc.awaitTermination() ---------------------------------------------------------------------------------------------------------------------- "updateVar" is correctly updated both in "myTransformation" and in the main scope and I can access the updated value. But when I try to do this moving the logic to a class, it fails. I have something like this (or the same queue listener from before, but moved to another class): class Listener(var updateVar: Broadcast[String]){... def someFunc()={ updateVar.unpersist() updateVar=sc.broadcast("new value") } ... } This fails: the variable can be destroyed but cannot be updated. Any suggestion on why there is this behaviour? Also I would like to know how Spark notices the reassignment to var and start the rebroadcasting. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org ________________________________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.