Re: Broadcast Variable Life Cycle

2016-08-30 Thread Sean Owen
Yeah, after destroy, accessing the broadcast variable results in an
error. Accessing it after it's unpersisted (on an executor) causes it
to be rebroadcast.

On Tue, Aug 30, 2016 at 5:12 PM, Jerry Lam  wrote:
> Hi Sean,
>
> Thank you for sharing the knowledge between unpersist and destroy.
> Does that mean unpersist keeps the broadcast variable in the driver whereas
> destroy will delete everything about the broadcast variable like it has
> never existed?
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Aug 30, 2016 at 11:58 AM, Sean Owen  wrote:
>>
>> Yes, although there's a difference between unpersist and destroy,
>> you'll hit the same type of question either way. You do indeed have to
>> reason about when you know the broadcast variable is no longer needed
>> in the face of lazy evaluation, and that's hard.
>>
>> Sometimes it's obvious and you can take advantage of this to
>> proactively free resources. You may have to consider restructuring the
>> computation to allow for more resources to be freed, if this is
>> important to scale.
>>
>> Keep in mind that things that are computed and cached may be lost and
>> recomputed even after their parent RDDs were definitely already
>> computed and don't seem to be needed. This is why unpersist is often
>> the better thing to call because it allows for variables to be
>> rebroadcast if needed in this case. Destroy permanently closes the
>> broadcast.
>>
>> On Tue, Aug 30, 2016 at 4:43 PM, Jerry Lam  wrote:
>> > Hi Sean,
>> >
>> > Thank you for the response. The only problem is that actively managing
>> > broadcast variables require to return the broadcast variables to the
>> > caller
>> > if the function that creates the broadcast variables does not contain
>> > any
>> > action. That is the scope that uses the broadcast variables cannot
>> > destroy
>> > the broadcast variables in many cases. For example:
>> >
>> > ==
>> > def perfromTransformation(rdd: RDD[int]) = {
>> >val sharedMap = sc.broadcast(map)
>> >rdd.map{id =>
>> >   val localMap = sharedMap.vlaue
>> >   (id, localMap(id))
>> >}
>> > }
>> >
>> > def main = {
>> > 
>> > performTransformation(rdd).toDF("id",
>> > "i").write.parquet("dummy_example")
>> > }
>> > ==
>> >
>> > In this example above, we cannot destroy the sharedMap before the
>> > write.parquet is executed because RDD is evaluated lazily. We will get a
>> > exception if I put sharedMap.destroy like this:
>> >
>> > ==
>> > def perfromTransformation(rdd: RDD[int]) = {
>> >val sharedMap = sc.broadcast(map)
>> >val result = rdd.map{id =>
>> >   val localMap = sharedMap.vlaue
>> >   (id, localMap(id))
>> >}
>> >sharedMap.destroy
>> >result
>> > }
>> > ==
>> >
>> > Am I missing something? Are there better way to do this without
>> > returning
>> > the broadcast variables to the main function?
>> >
>> > Best Regards,
>> >
>> > Jerry
>> >
>> >
>> >
>> > On Mon, Aug 29, 2016 at 12:11 PM, Sean Owen  wrote:
>> >>
>> >> Yes you want to actively unpersist() or destroy() broadcast variables
>> >> when they're no longer needed. They can eventually be removed when the
>> >> reference on the driver is garbage collected, but you usually would
>> >> not want to rely on that.
>> >>
>> >> On Mon, Aug 29, 2016 at 4:30 PM, Jerry Lam 
>> >> wrote:
>> >> > Hello spark developers,
>> >> >
>> >> > Anyone can shed some lights on the life cycle of the broadcast
>> >> > variables?
>> >> > Basically, if I have a broadcast variable defined in a loop and for
>> >> > each
>> >> > iteration, I provide a different value.
>> >> > // For example:
>> >> > for(i< 1 to 10) {
>> >> > val bc = sc.broadcast(i)
>> >> > sc.parallelize(Seq(1,2,3)).map{id => val i = bc.value; (id,
>> >> > i)}.toDF("id", "i").write.parquet("/dummy_output")
>> >> > }
>> >> >
>> >> > Do I need to active manage the broadcast variable in this case? I
>> >> > know
>> >> > this
>> >> > example is not real but please imagine this broadcast variable can
>> >> > hold
>> >> > an
>> >> > array of 1M Long.
>> >> >
>> >> > Regards,
>> >> >
>> >> > Jerry
>> >> >
>> >> >
>> >> >
>> >> > On Sun, Aug 21, 2016 at 1:07 PM, Jerry Lam 
>> >> > wrote:
>> >> >>
>> >> >> Hello spark developers,
>> >> >>
>> >> >> Can someone explain to me what is the lifecycle of a broadcast
>> >> >> variable?
>> >> >> When a broadcast variable will be garbage-collected at the
>> >> >> driver-side
>> >> >> and
>> >> >> at the executor-side? Does a spark application need to actively
>> >> >> manage
>> >> >> the
>> >> >> broadcast variables to ensure that it will not run in OOM?
>> >> >>
>> >> >> Best Regards,
>> >> >>
>> >> >> Jerry
>> >> >
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Broadcast Variable Life Cycle

2016-08-30 Thread Jerry Lam
Hi Sean,

Thank you for sharing the knowledge between unpersist and destroy.
Does that mean unpersist keeps the broadcast variable in the driver whereas
destroy will delete everything about the broadcast variable like it has
never existed?

Best Regards,

Jerry


On Tue, Aug 30, 2016 at 11:58 AM, Sean Owen  wrote:

> Yes, although there's a difference between unpersist and destroy,
> you'll hit the same type of question either way. You do indeed have to
> reason about when you know the broadcast variable is no longer needed
> in the face of lazy evaluation, and that's hard.
>
> Sometimes it's obvious and you can take advantage of this to
> proactively free resources. You may have to consider restructuring the
> computation to allow for more resources to be freed, if this is
> important to scale.
>
> Keep in mind that things that are computed and cached may be lost and
> recomputed even after their parent RDDs were definitely already
> computed and don't seem to be needed. This is why unpersist is often
> the better thing to call because it allows for variables to be
> rebroadcast if needed in this case. Destroy permanently closes the
> broadcast.
>
> On Tue, Aug 30, 2016 at 4:43 PM, Jerry Lam  wrote:
> > Hi Sean,
> >
> > Thank you for the response. The only problem is that actively managing
> > broadcast variables require to return the broadcast variables to the
> caller
> > if the function that creates the broadcast variables does not contain any
> > action. That is the scope that uses the broadcast variables cannot
> destroy
> > the broadcast variables in many cases. For example:
> >
> > ==
> > def perfromTransformation(rdd: RDD[int]) = {
> >val sharedMap = sc.broadcast(map)
> >rdd.map{id =>
> >   val localMap = sharedMap.vlaue
> >   (id, localMap(id))
> >}
> > }
> >
> > def main = {
> > 
> > performTransformation(rdd).toDF("id",
> > "i").write.parquet("dummy_example")
> > }
> > ==
> >
> > In this example above, we cannot destroy the sharedMap before the
> > write.parquet is executed because RDD is evaluated lazily. We will get a
> > exception if I put sharedMap.destroy like this:
> >
> > ==
> > def perfromTransformation(rdd: RDD[int]) = {
> >val sharedMap = sc.broadcast(map)
> >val result = rdd.map{id =>
> >   val localMap = sharedMap.vlaue
> >   (id, localMap(id))
> >}
> >sharedMap.destroy
> >result
> > }
> > ==
> >
> > Am I missing something? Are there better way to do this without returning
> > the broadcast variables to the main function?
> >
> > Best Regards,
> >
> > Jerry
> >
> >
> >
> > On Mon, Aug 29, 2016 at 12:11 PM, Sean Owen  wrote:
> >>
> >> Yes you want to actively unpersist() or destroy() broadcast variables
> >> when they're no longer needed. They can eventually be removed when the
> >> reference on the driver is garbage collected, but you usually would
> >> not want to rely on that.
> >>
> >> On Mon, Aug 29, 2016 at 4:30 PM, Jerry Lam 
> wrote:
> >> > Hello spark developers,
> >> >
> >> > Anyone can shed some lights on the life cycle of the broadcast
> >> > variables?
> >> > Basically, if I have a broadcast variable defined in a loop and for
> each
> >> > iteration, I provide a different value.
> >> > // For example:
> >> > for(i< 1 to 10) {
> >> > val bc = sc.broadcast(i)
> >> > sc.parallelize(Seq(1,2,3)).map{id => val i = bc.value; (id,
> >> > i)}.toDF("id", "i").write.parquet("/dummy_output")
> >> > }
> >> >
> >> > Do I need to active manage the broadcast variable in this case? I know
> >> > this
> >> > example is not real but please imagine this broadcast variable can
> hold
> >> > an
> >> > array of 1M Long.
> >> >
> >> > Regards,
> >> >
> >> > Jerry
> >> >
> >> >
> >> >
> >> > On Sun, Aug 21, 2016 at 1:07 PM, Jerry Lam 
> wrote:
> >> >>
> >> >> Hello spark developers,
> >> >>
> >> >> Can someone explain to me what is the lifecycle of a broadcast
> >> >> variable?
> >> >> When a broadcast variable will be garbage-collected at the
> driver-side
> >> >> and
> >> >> at the executor-side? Does a spark application need to actively
> manage
> >> >> the
> >> >> broadcast variables to ensure that it will not run in OOM?
> >> >>
> >> >> Best Regards,
> >> >>
> >> >> Jerry
> >> >
> >> >
> >
> >
>


Re: Broadcast Variable Life Cycle

2016-08-30 Thread Sean Owen
Yes, although there's a difference between unpersist and destroy,
you'll hit the same type of question either way. You do indeed have to
reason about when you know the broadcast variable is no longer needed
in the face of lazy evaluation, and that's hard.

Sometimes it's obvious and you can take advantage of this to
proactively free resources. You may have to consider restructuring the
computation to allow for more resources to be freed, if this is
important to scale.

Keep in mind that things that are computed and cached may be lost and
recomputed even after their parent RDDs were definitely already
computed and don't seem to be needed. This is why unpersist is often
the better thing to call because it allows for variables to be
rebroadcast if needed in this case. Destroy permanently closes the
broadcast.

On Tue, Aug 30, 2016 at 4:43 PM, Jerry Lam  wrote:
> Hi Sean,
>
> Thank you for the response. The only problem is that actively managing
> broadcast variables require to return the broadcast variables to the caller
> if the function that creates the broadcast variables does not contain any
> action. That is the scope that uses the broadcast variables cannot destroy
> the broadcast variables in many cases. For example:
>
> ==
> def perfromTransformation(rdd: RDD[int]) = {
>val sharedMap = sc.broadcast(map)
>rdd.map{id =>
>   val localMap = sharedMap.vlaue
>   (id, localMap(id))
>}
> }
>
> def main = {
> 
> performTransformation(rdd).toDF("id",
> "i").write.parquet("dummy_example")
> }
> ==
>
> In this example above, we cannot destroy the sharedMap before the
> write.parquet is executed because RDD is evaluated lazily. We will get a
> exception if I put sharedMap.destroy like this:
>
> ==
> def perfromTransformation(rdd: RDD[int]) = {
>val sharedMap = sc.broadcast(map)
>val result = rdd.map{id =>
>   val localMap = sharedMap.vlaue
>   (id, localMap(id))
>}
>sharedMap.destroy
>result
> }
> ==
>
> Am I missing something? Are there better way to do this without returning
> the broadcast variables to the main function?
>
> Best Regards,
>
> Jerry
>
>
>
> On Mon, Aug 29, 2016 at 12:11 PM, Sean Owen  wrote:
>>
>> Yes you want to actively unpersist() or destroy() broadcast variables
>> when they're no longer needed. They can eventually be removed when the
>> reference on the driver is garbage collected, but you usually would
>> not want to rely on that.
>>
>> On Mon, Aug 29, 2016 at 4:30 PM, Jerry Lam  wrote:
>> > Hello spark developers,
>> >
>> > Anyone can shed some lights on the life cycle of the broadcast
>> > variables?
>> > Basically, if I have a broadcast variable defined in a loop and for each
>> > iteration, I provide a different value.
>> > // For example:
>> > for(i< 1 to 10) {
>> > val bc = sc.broadcast(i)
>> > sc.parallelize(Seq(1,2,3)).map{id => val i = bc.value; (id,
>> > i)}.toDF("id", "i").write.parquet("/dummy_output")
>> > }
>> >
>> > Do I need to active manage the broadcast variable in this case? I know
>> > this
>> > example is not real but please imagine this broadcast variable can hold
>> > an
>> > array of 1M Long.
>> >
>> > Regards,
>> >
>> > Jerry
>> >
>> >
>> >
>> > On Sun, Aug 21, 2016 at 1:07 PM, Jerry Lam  wrote:
>> >>
>> >> Hello spark developers,
>> >>
>> >> Can someone explain to me what is the lifecycle of a broadcast
>> >> variable?
>> >> When a broadcast variable will be garbage-collected at the driver-side
>> >> and
>> >> at the executor-side? Does a spark application need to actively manage
>> >> the
>> >> broadcast variables to ensure that it will not run in OOM?
>> >>
>> >> Best Regards,
>> >>
>> >> Jerry
>> >
>> >
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Broadcast Variable Life Cycle

2016-08-30 Thread Jerry Lam
Hi Sean,

Thank you for the response. The only problem is that actively managing
broadcast variables require to return the broadcast variables to the caller
if the function that creates the broadcast variables does not contain any
action. That is the scope that uses the broadcast variables cannot destroy
the broadcast variables in many cases. For example:

==
def perfromTransformation(rdd: RDD[int]) = {
   val sharedMap = sc.broadcast(map)
   rdd.map{id =>
  val localMap = sharedMap.vlaue
  (id, localMap(id))
   }
}

def main = {

performTransformation(rdd).toDF("id",
"i").write.parquet("dummy_example")
}
==

In this example above, we cannot destroy the sharedMap before the
write.parquet is executed because RDD is evaluated lazily. We will get a
exception if I put sharedMap.destroy like this:

==
def perfromTransformation(rdd: RDD[int]) = {
   val sharedMap = sc.broadcast(map)
   val result = rdd.map{id =>
  val localMap = sharedMap.vlaue
  (id, localMap(id))
   }
   sharedMap.destroy
   result
}
==

Am I missing something? Are there better way to do this without returning
the broadcast variables to the main function?

Best Regards,

Jerry



On Mon, Aug 29, 2016 at 12:11 PM, Sean Owen  wrote:

> Yes you want to actively unpersist() or destroy() broadcast variables
> when they're no longer needed. They can eventually be removed when the
> reference on the driver is garbage collected, but you usually would
> not want to rely on that.
>
> On Mon, Aug 29, 2016 at 4:30 PM, Jerry Lam  wrote:
> > Hello spark developers,
> >
> > Anyone can shed some lights on the life cycle of the broadcast variables?
> > Basically, if I have a broadcast variable defined in a loop and for each
> > iteration, I provide a different value.
> > // For example:
> > for(i< 1 to 10) {
> > val bc = sc.broadcast(i)
> > sc.parallelize(Seq(1,2,3)).map{id => val i = bc.value; (id,
> > i)}.toDF("id", "i").write.parquet("/dummy_output")
> > }
> >
> > Do I need to active manage the broadcast variable in this case? I know
> this
> > example is not real but please imagine this broadcast variable can hold
> an
> > array of 1M Long.
> >
> > Regards,
> >
> > Jerry
> >
> >
> >
> > On Sun, Aug 21, 2016 at 1:07 PM, Jerry Lam  wrote:
> >>
> >> Hello spark developers,
> >>
> >> Can someone explain to me what is the lifecycle of a broadcast variable?
> >> When a broadcast variable will be garbage-collected at the driver-side
> and
> >> at the executor-side? Does a spark application need to actively manage
> the
> >> broadcast variables to ensure that it will not run in OOM?
> >>
> >> Best Regards,
> >>
> >> Jerry
> >
> >
>


Re: Broadcast Variable Life Cycle

2016-08-29 Thread Sean Owen
Yes you want to actively unpersist() or destroy() broadcast variables
when they're no longer needed. They can eventually be removed when the
reference on the driver is garbage collected, but you usually would
not want to rely on that.

On Mon, Aug 29, 2016 at 4:30 PM, Jerry Lam  wrote:
> Hello spark developers,
>
> Anyone can shed some lights on the life cycle of the broadcast variables?
> Basically, if I have a broadcast variable defined in a loop and for each
> iteration, I provide a different value.
> // For example:
> for(i< 1 to 10) {
> val bc = sc.broadcast(i)
> sc.parallelize(Seq(1,2,3)).map{id => val i = bc.value; (id,
> i)}.toDF("id", "i").write.parquet("/dummy_output")
> }
>
> Do I need to active manage the broadcast variable in this case? I know this
> example is not real but please imagine this broadcast variable can hold an
> array of 1M Long.
>
> Regards,
>
> Jerry
>
>
>
> On Sun, Aug 21, 2016 at 1:07 PM, Jerry Lam  wrote:
>>
>> Hello spark developers,
>>
>> Can someone explain to me what is the lifecycle of a broadcast variable?
>> When a broadcast variable will be garbage-collected at the driver-side and
>> at the executor-side? Does a spark application need to actively manage the
>> broadcast variables to ensure that it will not run in OOM?
>>
>> Best Regards,
>>
>> Jerry
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Broadcast Variable Life Cycle

2016-08-29 Thread Jerry Lam
Hello spark developers,

Anyone can shed some lights on the life cycle of the broadcast variables?
Basically, if I have a broadcast variable defined in a loop and for each
iteration, I provide a different value.
// For example:
for(i< 1 to 10) {
val bc = sc.broadcast(i)
sc.parallelize(Seq(1,2,3)).map{id => val i = bc.value; (id,
i)}.toDF("id", "i").write.parquet("/dummy_output")
}

Do I need to active manage the broadcast variable in this case? I know this
example is not real but please imagine this broadcast variable can hold an
array of 1M Long.

Regards,

Jerry



On Sun, Aug 21, 2016 at 1:07 PM, Jerry Lam  wrote:

> Hello spark developers,
>
> Can someone explain to me what is the lifecycle of a broadcast variable?
> When a broadcast variable will be garbage-collected at the driver-side and
> at the executor-side? Does a spark application need to actively manage the
> broadcast variables to ensure that it will not run in OOM?
>
> Best Regards,
>
> Jerry
>