Simone, here are some thoughts. Please check out the "understanding closures" 
section of the Spark Programming Guide. Secondly, broadcast variables do not 
propagate updates to the underlying data. You must either create a new 
broadcast variable or alternately if you simply wish to accumulate results you 
can use an Accumulator that stores an array or queue as a buffer that you then 
read from to Kafka.

You should also be able to send the results to a new DStream instead, and link 
that DStream to Kafka. Hope this gives you some ideas to play with. Thanks!



Thank you,
Ilya Ganelin



-----Original Message-----
From: simone.robutti [simone.robu...@gmail.com<mailto:simone.robu...@gmail.com>]
Sent: Friday, August 07, 2015 10:07 AM Eastern Standard Time
To: user@spark.apache.org
Subject: Issue when rebroadcasting a variable outside of the definition scope


Hello everyone,

this is my first message ever to a mailing list so please pardon me if for
some reason I'm violating the etiquette.

I have a problem with rebroadcasting a variable. How it should work is not
well documented so I could find only a few and simple example to understand
how it should work.

What I'm trying to do is to propagate an update to the option for the
behaviour of my streaming transformations (in this case, the evaluation of
machine learning models). I have a listener on a kafka queue that wait for
messages and update the broadcasted variable.

I made it to work but the system doesn't rebroadcast anything if I pass the
DStream or the broadcasted variable as a parameter.

So they must be defined both in the same scope and the rebroadcasting should
happen again in the same scope. Right now my main function looks like this:
----------------------------------------------------------------------------------------------------------------------
 var updateVar= sc.broadcast("test")
 val stream=input.map(x => myTransformation(x,updateVar))
 stream.writeToKafka[String, String](outputProps,
        (m: String) => new KeyedMessage[String,
String](configuration.outputTopic, m +updateVar.value ))

val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1,
new DefaultDecoder(), new StringDecoder())(0)
for (messageAndTopic <- controlStream) {

        println("ricevo")
        updateVar.unpersist()
        updateVar=ssc.sparkContext.broadcast(messageAndTopic.message)


    }

ssc.start()
ssc.awaitTermination()

----------------------------------------------------------------------------------------------------------------------

"updateVar" is correctly updated both in "myTransformation" and in the main
scope and I can access the updated value.

But when I try  to do this moving the logic to a class, it fails. I have
something like this (or the same queue listener from before, but moved to
another class):

class Listener(var updateVar: Broadcast[String]){...
    def someFunc()={
       updateVar.unpersist()
       updateVar=sc.broadcast("new value")
    }
...
}

This fails: the variable can be destroyed but cannot be updated.

Any suggestion on why there is this behaviour? Also I would like to know how
Spark notices the reassignment to var and start the rebroadcasting.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.

Reply via email to