Re: Spark UNEVENLY distributing data

2018-05-22 Thread Saad Mufti
I think TableInputFormat will try to maintain as much locality as possible,
assigning one Spark partition per region and trying to assign that
partition to a YARN container/executor on the same node (assuming you're
using Spark over YARN). So the reason for the uneven distribution could be
that your HBase is not balanced to begin with and has too many regions on
the same region server corresponding to your largest bar. It all depends on
what HBase balancer you have configured and tuned. Assuming that is
properly configured, try to balance your HBase cluster before running the
Spark job. Tere are command s in hbase shell to do it manually if required.

Hope this helps.


Saad


On Sat, May 19, 2018 at 6:40 PM, Alchemist 
wrote:

> I am trying to parallelize a simple Spark program processes HBASE data in
> parallel.
>
> // Get Hbase RDD
> JavaPairRDD hBaseRDD = jsc
> .newAPIHadoopRDD(conf, TableInputFormat.class,
> ImmutableBytesWritable.class, Result.class);
> long count = hBaseRDD.count();
>
> Only two lines I see in the logs.  Zookeeper starts and Zookeeper stops
>
>
> The problem is my program is as SLOW as the largest bar. Found that ZK is 
> taking long time before shutting.
> 18/05/19 17:26:55 INFO zookeeper.ClientCnxn: Session establishment complete 
> on server :2181, sessionid = 0x163662b64eb046d, negotiated timeout = 4 
> 18/05/19
> 17:38:00 INFO zookeeper.ZooKeeper: Session: 0x163662b64eb046d closed
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: High Disk Usage In Spark 2.2.1 With No Shuffle Or Spill To Disk

2018-04-07 Thread Saad Mufti
I have been trying to monitor this while the job is running, I think I
forgot to account for the 3-way hdfs replication, so right there the output
is more like 21 TB instead of my claimed 7 TB. But it still looks like hdfs
is losing more disk space than can be account for by just the output, going
by the output of the dfsadmin command, so I am still trying to track that
down. The total allocated disk space of 28 TB should still be more than
enough.


Saad


On Sat, Apr 7, 2018 at 2:40 PM, Saad Mufti <saad.mu...@gmail.com> wrote:

> Thanks. I checked and it is using another s3 folder for the temporary
> restore space. The underlying code insists on the snapshot and the restore
> directory being on the same filesystem, so it is using Emrfs for both. So
> unless Emrfs is under the covers using some local disk space it doesn't
> seem like that is responsible.
>
> 
> Saad
>
> On Sat, Apr 7, 2018 at 2:37 PM Jörn Franke <jornfra...@gmail.com> wrote:
>
>> As far as I know the TableSnapshotInputFormat relies on a temporary folder
>> https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/
>> TableSnapshotInputFormat.html
>>
>>
>> Unfortunately some inputformats need a (local) tmp Directory. Sometimes
>> this cannot be avoided.
>>
>> See also the source:
>> https://github.com/apache/hbase/blob/master/hbase-
>> mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/
>> TableSnapshotInputFormat.java
>>
>> On 7. Apr 2018, at 20:26, Saad Mufti <saad.mu...@gmail.com> wrote:
>>
>> Hi,
>>
>> I have a simple ETL Spark job running on AWS EMR with Spark 2.2.1 . The
>> input data is HBase files in AWS S3 using EMRFS, but there is no HBase
>> running on the Spark cluster itself. It is restoring the HBase snapshot
>> into files on disk in another S3 folder used for temporary storage, then
>> creating an RDD over those files using HBase's TableSnapsotInputFormat
>> class. There is a large number of HBase regions, around 12000, and each
>> region gets translated to one Spark task/partition. We are running in YARN
>> mode, with one core per executor, so on our 120 node cluster we have around
>> 1680 executors running (not the full 1960 as YARN only gives us so many
>> containers due to memory limits).
>>
>> This is a simple ETL job that transforms the HBase data into Avro/Parquet
>> and writes to disk, there are no reduces or joins of any kind. The output
>> Parquet data is using Snappy compression, the total output is around 7 TB
>> while we have about 28 TB total disk provisioned in the cluster. The Spark
>> UI shows no disk storage being used for cached data, and not much heap
>> being used for caching either, which makes sense because in this simple job
>> we have no need to do RDD.cache as the RDD is not reused at all.
>>
>> So lately the job has started failing because close to finishing, some of
>> the YARN nodes start running low on disk and YARN marks them as unhealthy,
>> then kills all the executors on that node. But the problem just moves to
>> another node where the tasks are relaunched for another attempt until after
>> 4 failures for a given task the whole job fails.
>>
>> So I am trying to understand where all this disk usage is coming from? I
>> can see in Ganglia that disk is running low the longer the job runs no
>> matter which node I look at. Like I said the total output size of the final
>> output in hdfs is only around 7 TB while we have around 28 TB of disk
>> provisioned for hdfs.
>>
>> Any advice or pointers for where to look for the large disk usage would
>> be most appreciated.
>>
>> Thanks.
>>
>> 
>> Saad
>>
>>


Re: High Disk Usage In Spark 2.2.1 With No Shuffle Or Spill To Disk

2018-04-07 Thread Saad Mufti
Thanks. I checked and it is using another s3 folder for the temporary
restore space. The underlying code insists on the snapshot and the restore
directory being on the same filesystem, so it is using Emrfs for both. So
unless Emrfs is under the covers using some local disk space it doesn't
seem like that is responsible.


Saad

On Sat, Apr 7, 2018 at 2:37 PM Jörn Franke <jornfra...@gmail.com> wrote:

> As far as I know the TableSnapshotInputFormat relies on a temporary folder
>
> https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.html
>
>
> Unfortunately some inputformats need a (local) tmp Directory. Sometimes
> this cannot be avoided.
>
> See also the source:
>
> https://github.com/apache/hbase/blob/master/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
>
> On 7. Apr 2018, at 20:26, Saad Mufti <saad.mu...@gmail.com> wrote:
>
> Hi,
>
> I have a simple ETL Spark job running on AWS EMR with Spark 2.2.1 . The
> input data is HBase files in AWS S3 using EMRFS, but there is no HBase
> running on the Spark cluster itself. It is restoring the HBase snapshot
> into files on disk in another S3 folder used for temporary storage, then
> creating an RDD over those files using HBase's TableSnapsotInputFormat
> class. There is a large number of HBase regions, around 12000, and each
> region gets translated to one Spark task/partition. We are running in YARN
> mode, with one core per executor, so on our 120 node cluster we have around
> 1680 executors running (not the full 1960 as YARN only gives us so many
> containers due to memory limits).
>
> This is a simple ETL job that transforms the HBase data into Avro/Parquet
> and writes to disk, there are no reduces or joins of any kind. The output
> Parquet data is using Snappy compression, the total output is around 7 TB
> while we have about 28 TB total disk provisioned in the cluster. The Spark
> UI shows no disk storage being used for cached data, and not much heap
> being used for caching either, which makes sense because in this simple job
> we have no need to do RDD.cache as the RDD is not reused at all.
>
> So lately the job has started failing because close to finishing, some of
> the YARN nodes start running low on disk and YARN marks them as unhealthy,
> then kills all the executors on that node. But the problem just moves to
> another node where the tasks are relaunched for another attempt until after
> 4 failures for a given task the whole job fails.
>
> So I am trying to understand where all this disk usage is coming from? I
> can see in Ganglia that disk is running low the longer the job runs no
> matter which node I look at. Like I said the total output size of the final
> output in hdfs is only around 7 TB while we have around 28 TB of disk
> provisioned for hdfs.
>
> Any advice or pointers for where to look for the large disk usage would be
> most appreciated.
>
> Thanks.
>
> 
> Saad
>
>


High Disk Usage In Spark 2.2.1 With No Shuffle Or Spill To Disk

2018-04-07 Thread Saad Mufti
Hi,

I have a simple ETL Spark job running on AWS EMR with Spark 2.2.1 . The
input data is HBase files in AWS S3 using EMRFS, but there is no HBase
running on the Spark cluster itself. It is restoring the HBase snapshot
into files on disk in another S3 folder used for temporary storage, then
creating an RDD over those files using HBase's TableSnapsotInputFormat
class. There is a large number of HBase regions, around 12000, and each
region gets translated to one Spark task/partition. We are running in YARN
mode, with one core per executor, so on our 120 node cluster we have around
1680 executors running (not the full 1960 as YARN only gives us so many
containers due to memory limits).

This is a simple ETL job that transforms the HBase data into Avro/Parquet
and writes to disk, there are no reduces or joins of any kind. The output
Parquet data is using Snappy compression, the total output is around 7 TB
while we have about 28 TB total disk provisioned in the cluster. The Spark
UI shows no disk storage being used for cached data, and not much heap
being used for caching either, which makes sense because in this simple job
we have no need to do RDD.cache as the RDD is not reused at all.

So lately the job has started failing because close to finishing, some of
the YARN nodes start running low on disk and YARN marks them as unhealthy,
then kills all the executors on that node. But the problem just moves to
another node where the tasks are relaunched for another attempt until after
4 failures for a given task the whole job fails.

So I am trying to understand where all this disk usage is coming from? I
can see in Ganglia that disk is running low the longer the job runs no
matter which node I look at. Like I said the total output size of the final
output in hdfs is only around 7 TB while we have around 28 TB of disk
provisioned for hdfs.

Any advice or pointers for where to look for the large disk usage would be
most appreciated.

Thanks.


Saad