Yes, although there's a difference between unpersist and destroy,
you'll hit the same type of question either way. You do indeed have to
reason about when you know the broadcast variable is no longer needed
in the face of lazy evaluation, and that's hard.

Sometimes it's obvious and you can take advantage of this to
proactively free resources. You may have to consider restructuring the
computation to allow for more resources to be freed, if this is
important to scale.

Keep in mind that things that are computed and cached may be lost and
recomputed even after their parent RDDs were definitely already
computed and don't seem to be needed. This is why unpersist is often
the better thing to call because it allows for variables to be
rebroadcast if needed in this case. Destroy permanently closes the
broadcast.

On Tue, Aug 30, 2016 at 4:43 PM, Jerry Lam <chiling...@gmail.com> wrote:
> Hi Sean,
>
> Thank you for the response. The only problem is that actively managing
> broadcast variables require to return the broadcast variables to the caller
> if the function that creates the broadcast variables does not contain any
> action. That is the scope that uses the broadcast variables cannot destroy
> the broadcast variables in many cases. For example:
>
> ==============
> def perfromTransformation(rdd: RDD[int]) = {
>    val sharedMap = sc.broadcast(map)
>    rdd.map{id =>
>       val localMap = sharedMap.vlaue
>       (id, localMap(id))
>    }
> }
>
> def main = {
>     ....
>     performTransformation(rdd).toDF("id",
> "i").write.parquet("dummy_example")
> }
> ==============
>
> In this example above, we cannot destroy the sharedMap before the
> write.parquet is executed because RDD is evaluated lazily. We will get a
> exception if I put sharedMap.destroy like this:
>
> ==============
> def perfromTransformation(rdd: RDD[int]) = {
>    val sharedMap = sc.broadcast(map)
>    val result = rdd.map{id =>
>       val localMap = sharedMap.vlaue
>       (id, localMap(id))
>    }
>    sharedMap.destroy
>    result
> }
> ==============
>
> Am I missing something? Are there better way to do this without returning
> the broadcast variables to the main function?
>
> Best Regards,
>
> Jerry
>
>
>
> On Mon, Aug 29, 2016 at 12:11 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>> Yes you want to actively unpersist() or destroy() broadcast variables
>> when they're no longer needed. They can eventually be removed when the
>> reference on the driver is garbage collected, but you usually would
>> not want to rely on that.
>>
>> On Mon, Aug 29, 2016 at 4:30 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> > Hello spark developers,
>> >
>> > Anyone can shed some lights on the life cycle of the broadcast
>> > variables?
>> > Basically, if I have a broadcast variable defined in a loop and for each
>> > iteration, I provide a different value.
>> > // For example:
>> > for(i< 1 to 10) {
>> >     val bc = sc.broadcast(i)
>> >     sc.parallelize(Seq(1,2,3)).map{id => val i = bc.value; (id,
>> > i)}.toDF("id", "i").write.parquet("/dummy_output")
>> > }
>> >
>> > Do I need to active manage the broadcast variable in this case? I know
>> > this
>> > example is not real but please imagine this broadcast variable can hold
>> > an
>> > array of 1M Long.
>> >
>> > Regards,
>> >
>> > Jerry
>> >
>> >
>> >
>> > On Sun, Aug 21, 2016 at 1:07 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> >>
>> >> Hello spark developers,
>> >>
>> >> Can someone explain to me what is the lifecycle of a broadcast
>> >> variable?
>> >> When a broadcast variable will be garbage-collected at the driver-side
>> >> and
>> >> at the executor-side? Does a spark application need to actively manage
>> >> the
>> >> broadcast variables to ensure that it will not run in OOM?
>> >>
>> >> Best Regards,
>> >>
>> >> Jerry
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to