Re: HashedRelation Memory Pressure on Broadcast Joins

2016-03-07 Thread Davies Liu
The underlying buffer for UnsafeRow is reused in UnsafeProjection.

On Thu, Mar 3, 2016 at 9:11 PM, Rishi Mishra  wrote:
> Hi Davies,
> When you say "UnsafeRow could come from UnsafeProjection, so We should copy
> the rows for safety."  do you intend to say that the underlying state might
> change , because of some state update APIs ?
> Or its due to some other rationale ?
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Thu, Mar 3, 2016 at 3:59 AM, Davies Liu  wrote:
>>
>> I see, we could reduce the memory by moving the copy out of the
>> HashedRelation,
>> then we should do the copy before call HashedRelation for shuffle hash
>> join.
>>
>> Another things is that when we do broadcasting, we will have another
>> serialized copy
>> of hash table.
>>
>> For the table that's larger than 100M, we may not suggest to use Broadcast
>> join,
>> because it take time to send it to every executor also take the same
>> amount of
>> memory on every executor.
>>
>> On Wed, Mar 2, 2016 at 10:45 AM, Matt Cheah  wrote:
>> > I would expect the memory pressure to grow because not only are we
>> > storing
>> > the backing array to the iterator of the rows on the driver, but we’re
>> > also storing a copy of each of those rows in the hash table. Whereas if
>> > we
>> > didn’t do the copy on the drive side then the hash table would only have
>> > to store pointers to those rows in the array. Perhaps we can think about
>> > whether or not we want to be using the HashedRelation constructs in
>> > broadcast join physical plans?
>> >
>> > The file isn’t compressed - it was a 150MB CSV living on HDFS. I would
>> > expect it to fit in a 1GB heap, but I agree that it is difficult to
>> > reason
>> > about dataset size on disk vs. memory.
>> >
>> > -Matt Cheah
>> >
>> > On 3/2/16, 10:15 AM, "Davies Liu"  wrote:
>> >
>> >>UnsafeHashedRelation and HashedRelation could also be used in Executor
>> >>(for non-broadcast hash join), then the UnsafeRow could come from
>> >>UnsafeProjection,
>> >>so We should copy the rows for safety.
>> >>
>> >>We could have a smarter copy() for UnsafeRow (avoid the copy if it's
>> >>already copied),
>> >>but I don't think this copy here will increase the memory pressure.
>> >>The total memory
>> >>will be determined by how many rows are stored in the hash tables.
>> >>
>> >>In general, if you do not have enough memory, just don't increase
>> >>autoBroadcastJoinThreshold,
>> >>or the performance could be worse because of full GC.
>> >>
>> >>Sometimes the tables looks small as compressed files (for example,
>> >>parquet file),
>> >>once it's loaded into memory, it could required much more memory than
>> >> the
>> >>size
>> >>of file on disk.
>> >>
>> >>
>> >>On Tue, Mar 1, 2016 at 5:17 PM, Matt Cheah  wrote:
>> >>> Hi everyone,
>> >>>
>> >>> I had a quick question regarding our implementation of
>> >>>UnsafeHashedRelation
>> >>> and HashedRelation. It appears that we copy the rows that we’ve
>> >>>collected
>> >>> into memory upon inserting them into the hash table in
>> >>> UnsafeHashedRelation#apply(). I was wondering why we are copying the
>> >>>rows
>> >>> every time? I can’t imagine these rows being mutable in this scenario.
>> >>>
>> >>> The context is that I’m looking into a case where a small data frame
>> >>>should
>> >>> fit in the driver’s memory, but my driver ran out of memory after I
>> >>> increased the autoBroadcastJoinThreshold. YourKit is indicating that
>> >>>this
>> >>> logic is consuming more memory than my driver can handle.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> -Matt Cheah
>> >
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: HashedRelation Memory Pressure on Broadcast Joins

2016-03-03 Thread Rishi Mishra
Hi Davies,
When you say *"UnsafeRow could come from UnsafeProjection, so We should
copy the rows for safety."  *do you intend to say that the underlying state
might change , because of some state update APIs ?
Or its due to some other rationale ?

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Mar 3, 2016 at 3:59 AM, Davies Liu  wrote:

> I see, we could reduce the memory by moving the copy out of the
> HashedRelation,
> then we should do the copy before call HashedRelation for shuffle hash
> join.
>
> Another things is that when we do broadcasting, we will have another
> serialized copy
> of hash table.
>
> For the table that's larger than 100M, we may not suggest to use Broadcast
> join,
> because it take time to send it to every executor also take the same
> amount of
> memory on every executor.
>
> On Wed, Mar 2, 2016 at 10:45 AM, Matt Cheah  wrote:
> > I would expect the memory pressure to grow because not only are we
> storing
> > the backing array to the iterator of the rows on the driver, but we’re
> > also storing a copy of each of those rows in the hash table. Whereas if
> we
> > didn’t do the copy on the drive side then the hash table would only have
> > to store pointers to those rows in the array. Perhaps we can think about
> > whether or not we want to be using the HashedRelation constructs in
> > broadcast join physical plans?
> >
> > The file isn’t compressed - it was a 150MB CSV living on HDFS. I would
> > expect it to fit in a 1GB heap, but I agree that it is difficult to
> reason
> > about dataset size on disk vs. memory.
> >
> > -Matt Cheah
> >
> > On 3/2/16, 10:15 AM, "Davies Liu"  wrote:
> >
> >>UnsafeHashedRelation and HashedRelation could also be used in Executor
> >>(for non-broadcast hash join), then the UnsafeRow could come from
> >>UnsafeProjection,
> >>so We should copy the rows for safety.
> >>
> >>We could have a smarter copy() for UnsafeRow (avoid the copy if it's
> >>already copied),
> >>but I don't think this copy here will increase the memory pressure.
> >>The total memory
> >>will be determined by how many rows are stored in the hash tables.
> >>
> >>In general, if you do not have enough memory, just don't increase
> >>autoBroadcastJoinThreshold,
> >>or the performance could be worse because of full GC.
> >>
> >>Sometimes the tables looks small as compressed files (for example,
> >>parquet file),
> >>once it's loaded into memory, it could required much more memory than the
> >>size
> >>of file on disk.
> >>
> >>
> >>On Tue, Mar 1, 2016 at 5:17 PM, Matt Cheah  wrote:
> >>> Hi everyone,
> >>>
> >>> I had a quick question regarding our implementation of
> >>>UnsafeHashedRelation
> >>> and HashedRelation. It appears that we copy the rows that we’ve
> >>>collected
> >>> into memory upon inserting them into the hash table in
> >>> UnsafeHashedRelation#apply(). I was wondering why we are copying the
> >>>rows
> >>> every time? I can’t imagine these rows being mutable in this scenario.
> >>>
> >>> The context is that I’m looking into a case where a small data frame
> >>>should
> >>> fit in the driver’s memory, but my driver ran out of memory after I
> >>> increased the autoBroadcastJoinThreshold. YourKit is indicating that
> >>>this
> >>> logic is consuming more memory than my driver can handle.
> >>>
> >>> Thanks,
> >>>
> >>> -Matt Cheah
> >
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: HashedRelation Memory Pressure on Broadcast Joins

2016-03-02 Thread Davies Liu
I see, we could reduce the memory by moving the copy out of the HashedRelation,
then we should do the copy before call HashedRelation for shuffle hash join.

Another things is that when we do broadcasting, we will have another
serialized copy
of hash table.

For the table that's larger than 100M, we may not suggest to use Broadcast join,
because it take time to send it to every executor also take the same amount of
memory on every executor.

On Wed, Mar 2, 2016 at 10:45 AM, Matt Cheah  wrote:
> I would expect the memory pressure to grow because not only are we storing
> the backing array to the iterator of the rows on the driver, but we’re
> also storing a copy of each of those rows in the hash table. Whereas if we
> didn’t do the copy on the drive side then the hash table would only have
> to store pointers to those rows in the array. Perhaps we can think about
> whether or not we want to be using the HashedRelation constructs in
> broadcast join physical plans?
>
> The file isn’t compressed - it was a 150MB CSV living on HDFS. I would
> expect it to fit in a 1GB heap, but I agree that it is difficult to reason
> about dataset size on disk vs. memory.
>
> -Matt Cheah
>
> On 3/2/16, 10:15 AM, "Davies Liu"  wrote:
>
>>UnsafeHashedRelation and HashedRelation could also be used in Executor
>>(for non-broadcast hash join), then the UnsafeRow could come from
>>UnsafeProjection,
>>so We should copy the rows for safety.
>>
>>We could have a smarter copy() for UnsafeRow (avoid the copy if it's
>>already copied),
>>but I don't think this copy here will increase the memory pressure.
>>The total memory
>>will be determined by how many rows are stored in the hash tables.
>>
>>In general, if you do not have enough memory, just don't increase
>>autoBroadcastJoinThreshold,
>>or the performance could be worse because of full GC.
>>
>>Sometimes the tables looks small as compressed files (for example,
>>parquet file),
>>once it's loaded into memory, it could required much more memory than the
>>size
>>of file on disk.
>>
>>
>>On Tue, Mar 1, 2016 at 5:17 PM, Matt Cheah  wrote:
>>> Hi everyone,
>>>
>>> I had a quick question regarding our implementation of
>>>UnsafeHashedRelation
>>> and HashedRelation. It appears that we copy the rows that we’ve
>>>collected
>>> into memory upon inserting them into the hash table in
>>> UnsafeHashedRelation#apply(). I was wondering why we are copying the
>>>rows
>>> every time? I can’t imagine these rows being mutable in this scenario.
>>>
>>> The context is that I’m looking into a case where a small data frame
>>>should
>>> fit in the driver’s memory, but my driver ran out of memory after I
>>> increased the autoBroadcastJoinThreshold. YourKit is indicating that
>>>this
>>> logic is consuming more memory than my driver can handle.
>>>
>>> Thanks,
>>>
>>> -Matt Cheah
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: HashedRelation Memory Pressure on Broadcast Joins

2016-03-02 Thread Matt Cheah
I would expect the memory pressure to grow because not only are we storing
the backing array to the iterator of the rows on the driver, but we’re
also storing a copy of each of those rows in the hash table. Whereas if we
didn’t do the copy on the drive side then the hash table would only have
to store pointers to those rows in the array. Perhaps we can think about
whether or not we want to be using the HashedRelation constructs in
broadcast join physical plans?

The file isn’t compressed - it was a 150MB CSV living on HDFS. I would
expect it to fit in a 1GB heap, but I agree that it is difficult to reason
about dataset size on disk vs. memory.

-Matt Cheah

On 3/2/16, 10:15 AM, "Davies Liu"  wrote:

>UnsafeHashedRelation and HashedRelation could also be used in Executor
>(for non-broadcast hash join), then the UnsafeRow could come from
>UnsafeProjection,
>so We should copy the rows for safety.
>
>We could have a smarter copy() for UnsafeRow (avoid the copy if it's
>already copied),
>but I don't think this copy here will increase the memory pressure.
>The total memory
>will be determined by how many rows are stored in the hash tables.
>
>In general, if you do not have enough memory, just don't increase
>autoBroadcastJoinThreshold,
>or the performance could be worse because of full GC.
>
>Sometimes the tables looks small as compressed files (for example,
>parquet file),
>once it's loaded into memory, it could required much more memory than the
>size
>of file on disk.
>
>
>On Tue, Mar 1, 2016 at 5:17 PM, Matt Cheah  wrote:
>> Hi everyone,
>>
>> I had a quick question regarding our implementation of
>>UnsafeHashedRelation
>> and HashedRelation. It appears that we copy the rows that we’ve
>>collected
>> into memory upon inserting them into the hash table in
>> UnsafeHashedRelation#apply(). I was wondering why we are copying the
>>rows
>> every time? I can’t imagine these rows being mutable in this scenario.
>>
>> The context is that I’m looking into a case where a small data frame
>>should
>> fit in the driver’s memory, but my driver ran out of memory after I
>> increased the autoBroadcastJoinThreshold. YourKit is indicating that
>>this
>> logic is consuming more memory than my driver can handle.
>>
>> Thanks,
>>
>> -Matt Cheah



Re: HashedRelation Memory Pressure on Broadcast Joins

2016-03-02 Thread Davies Liu
UnsafeHashedRelation and HashedRelation could also be used in Executor
(for non-broadcast hash join), then the UnsafeRow could come from
UnsafeProjection,
so We should copy the rows for safety.

We could have a smarter copy() for UnsafeRow (avoid the copy if it's
already copied),
but I don't think this copy here will increase the memory pressure.
The total memory
will be determined by how many rows are stored in the hash tables.

In general, if you do not have enough memory, just don't increase
autoBroadcastJoinThreshold,
or the performance could be worse because of full GC.

Sometimes the tables looks small as compressed files (for example,
parquet file),
once it's loaded into memory, it could required much more memory than the size
of file on disk.


On Tue, Mar 1, 2016 at 5:17 PM, Matt Cheah  wrote:
> Hi everyone,
>
> I had a quick question regarding our implementation of UnsafeHashedRelation
> and HashedRelation. It appears that we copy the rows that we’ve collected
> into memory upon inserting them into the hash table in
> UnsafeHashedRelation#apply(). I was wondering why we are copying the rows
> every time? I can’t imagine these rows being mutable in this scenario.
>
> The context is that I’m looking into a case where a small data frame should
> fit in the driver’s memory, but my driver ran out of memory after I
> increased the autoBroadcastJoinThreshold. YourKit is indicating that this
> logic is consuming more memory than my driver can handle.
>
> Thanks,
>
> -Matt Cheah

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



HashedRelation Memory Pressure on Broadcast Joins

2016-03-01 Thread Matt Cheah
Hi everyone,

I had a quick question regarding our implementation of UnsafeHashedRelation and 
HashedRelation.
 It appears that we copy the rows that we’ve collected into memory upon 
inserting them into the hash table in UnsafeHashedRelation#apply(). I was 
wondering why we are copying the rows every time? I can’t imagine these rows 
being mutable in this scenario.

The context is that I’m looking into a case where a small data frame should fit 
in the driver’s memory, but my driver ran out of memory after I increased the 
autoBroadcastJoinThreshold. YourKit is indicating that this logic is consuming 
more memory than my driver can handle.

Thanks,

-Matt Cheah