Hello everyone,

this is my first message ever to a mailing list so please pardon me if for
some reason I'm violating the etiquette.

I have a problem with rebroadcasting a variable. How it should work is not
well documented so I could find only a few and simple example to understand
how it should work.

What I'm trying to do is to propagate an update to the option for the
behaviour of my streaming transformations (in this case, the evaluation of
machine learning models). I have a listener on a kafka queue that wait for
messages and update the broadcasted variable. 

I made it to work but the system doesn't rebroadcast anything if I pass the
DStream or the broadcasted variable as a parameter.

So they must be defined both in the same scope and the rebroadcasting should
happen again in the same scope. Right now my main function looks like this:
----------------------------------------------------------------------------------------------------------------------
 var updateVar= sc.broadcast("test")
 val stream=input.map(x => myTransformation(x,updateVar))
 stream.writeToKafka[String, String](outputProps,
        (m: String) => new KeyedMessage[String,
String](configuration.outputTopic, m +updateVar.value ))

val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1,
new DefaultDecoder(), new StringDecoder())(0)
for (messageAndTopic <- controlStream) {
      
        println("ricevo")
        updateVar.unpersist()
        updateVar=ssc.sparkContext.broadcast(messageAndTopic.message)

      
    }

ssc.start()
ssc.awaitTermination()

----------------------------------------------------------------------------------------------------------------------

"updateVar" is correctly updated both in "myTransformation" and in the main
scope and I can access the updated value.

But when I try  to do this moving the logic to a class, it fails. I have
something like this (or the same queue listener from before, but moved to
another class):

class Listener(var updateVar: Broadcast[String]){...
    def someFunc()={
       updateVar.unpersist()
       updateVar=sc.broadcast("new value")
    }
...
}

This fails: the variable can be destroyed but cannot be updated. 

Any suggestion on why there is this behaviour? Also I would like to know how
Spark notices the reassignment to var and start the rebroadcasting.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to