Re: sc.parallelize(512k items) doesn't always use 64 executors

2015-07-30 Thread Konstantinos Kougios

yes,thanks, that sorted out the issue.

On 30/07/15 09:26, Akhil Das wrote:
sc.parallelize takes a second parameter which is the total number of 
partitions, are you using that?


Thanks
Best Regards

On Wed, Jul 29, 2015 at 9:27 PM, Kostas Kougios 
kostas.koug...@googlemail.com mailto:kostas.koug...@googlemail.com 
wrote:


Hi, I do an sc.parallelize with a list of 512k items. But
sometimes not all
executors are used, i.e. they don't have work to do and nothing is
logged
after:

15/07/29 16:35:22 WARN internal.ThreadLocalRandom: Failed to
generate a seed
from SecureRandom within 3 seconds. Not enough entrophy?
15/07/29 16:35:22 INFO util.Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port
56477.
15/07/29 16:35:22 INFO netty.NettyBlockTransferService: Server
created on
56477
15/07/29 16:35:22 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/07/29 16:35:22 INFO storage.BlockManagerMaster: Registered
BlockManager

Any ideas why so? My last run has 3 of the 64 executors not used.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/sc-parallelize-512k-items-doesn-t-always-use-64-executors-tp24062.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org






Re: RECEIVED SIGNAL 15: SIGTERM

2015-07-13 Thread Konstantinos Kougios
yes YARN was terminating the executor because the off heap memory limit 
was exceeded.


On 13/07/15 06:55, Ruslan Dautkhanov wrote:

 the executor receives a SIGTERM (from whom???)

From YARN Resource Manager.

Check if yarn fair scheduler preemption and/or speculative execution 
are turned on,

then it's quite possible and not a bug.



--
Ruslan Dautkhanov

On Sun, Jul 12, 2015 at 11:29 PM, Jong Wook Kim jongw...@nyu.edu 
mailto:jongw...@nyu.edu wrote:


Based on my experience, YARN containers can get SIGTERM when

- it produces too much logs and use up the hard drive
- it uses off-heap memory more than what is given by
spark.yarn.executor.memoryOverhead configuration. It might be due
to too many classes loaded (less than MaxPermGen but more than
memoryOverhead), or some other off-heap memory allocated by
networking library, etc.
- it opens too many file descriptors, which you can check on the
executor node's /proc/executor jvm's pid/fd/

Does any of these apply to your situation?

Jong Wook


On Jul 7, 2015, at 19:16, Kostas Kougios
kostas.koug...@googlemail.com
mailto:kostas.koug...@googlemail.com wrote:

I am still receiving these weird sigterms on the executors. The
driver claims
it lost the executor, the executor receives a SIGTERM (from whom???)

It doesn't seem a memory related issue though increasing memory
takes the
job a bit further or completes it. But why? there is no memory
pressure on
neither driver nor executor. And nothing in the logs indicating so.

driver:

15/07/07 10:47:04 INFO scheduler.TaskSetManager: Starting task
14762.0 in
stage 0.0 (TID 14762, cruncher03.stratified, PROCESS_LOCAL, 13069
bytes)
15/07/07 10:47:04 INFO scheduler.TaskSetManager: Finished task
14517.0 in
stage 0.0 (TID 14517) in 15950 ms on cruncher03.stratified
(14507/42240)
15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver
terminated
or disconnected! Shutting down. cruncher05.stratified:32976
15/07/07 10:47:04 ERROR cluster.YarnClusterScheduler: Lost
executor 1 on
cruncher05.stratified: remote Rpc client disassociated
15/07/07 10:47:04 INFO scheduler.TaskSetManager: Re-queueing
tasks for 1
from TaskSet 0.0
15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver
terminated
or disconnected! Shutting down. cruncher05.stratified:32976
15/07/07 10:47:04 WARN remote.ReliableDeliverySupervisor:
Association with
remote system
[akka.tcp://sparkExecutor@cruncher05.stratified:32976] has
failed, address is now gated for [5000] ms. Reason is:
[Disassociated].

15/07/07 10:47:04 WARN scheduler.TaskSetManager: Lost task
14591.0 in stage
0.0 (TID 14591, cruncher05.stratified): ExecutorLostFailure
(executor 1
lost)

gc log for driver, it doesnt look like it run outofmem:

2015-07-07T10:45:19.887+0100: [GC (Allocation Failure)
1764131K-1391211K(3393024K), 0.0102839 secs]
2015-07-07T10:46:00.934+0100: [GC (Allocation Failure)
1764971K-1391867K(3405312K), 0.0099062 secs]
2015-07-07T10:46:45.252+0100: [GC (Allocation Failure)
1782011K-1392596K(3401216K), 0.0167572 secs]

executor:

15/07/07 10:47:03 INFO executor.Executor: Running task 14750.0 in
stage 0.0
(TID 14750)
15/07/07 10:47:03 INFO spark.CacheManager: Partition
rdd_493_14750 not
found, computing it
15/07/07 10:47:03 ERROR executor.CoarseGrainedExecutorBackend:
RECEIVED
SIGNAL 15: SIGTERM
15/07/07 10:47:03 INFO storage.DiskBlockManager: Shutdown hook called

executor gc log (no outofmem as it seems):
2015-07-07T10:47:02.332+0100: [GC (GCLocker Initiated GC)
24696750K-23712939K(33523712K), 0.0416640 secs]
2015-07-07T10:47:02.598+0100: [GC (GCLocker Initiated GC)
24700520K-23722043K(33523712K), 0.0391156 secs]
2015-07-07T10:47:02.862+0100: [GC (Allocation Failure)
24709182K-23726510K(33518592K), 0.0390784 secs]





--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/RECEIVED-SIGNAL-15-SIGTERM-tp23668.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com http://Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org








Re: RECEIVED SIGNAL 15: SIGTERM

2015-07-13 Thread Konstantinos Kougios
it was the memoryOverhead. It runs ok with more of that, but do you know 
which libraries could affect this? I find it strange that it needs 4g 
for a task that processes some xml files. The task themselfs require 
less Xmx.


Cheers

On 13/07/15 06:29, Jong Wook Kim wrote:

Based on my experience, YARN containers can get SIGTERM when

- it produces too much logs and use up the hard drive
- it uses off-heap memory more than what is given by 
spark.yarn.executor.memoryOverhead configuration. It might be due to 
too many classes loaded (less than MaxPermGen but more than 
memoryOverhead), or some other off-heap memory allocated by networking 
library, etc.
- it opens too many file descriptors, which you can check on the 
executor node's /proc/executor jvm's pid/fd/


Does any of these apply to your situation?

Jong Wook

On Jul 7, 2015, at 19:16, Kostas Kougios 
kostas.koug...@googlemail.com 
mailto:kostas.koug...@googlemail.com wrote:


I am still receiving these weird sigterms on the executors. The 
driver claims

it lost the executor, the executor receives a SIGTERM (from whom???)

It doesn't seem a memory related issue though increasing memory takes the
job a bit further or completes it. But why? there is no memory 
pressure on

neither driver nor executor. And nothing in the logs indicating so.

driver:

15/07/07 10:47:04 INFO scheduler.TaskSetManager: Starting task 14762.0 in
stage 0.0 (TID 14762, cruncher03.stratified, PROCESS_LOCAL, 13069 bytes)
15/07/07 10:47:04 INFO scheduler.TaskSetManager: Finished task 14517.0 in
stage 0.0 (TID 14517) in 15950 ms on cruncher03.stratified (14507/42240)
15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver 
terminated

or disconnected! Shutting down. cruncher05.stratified:32976
15/07/07 10:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 1 on
cruncher05.stratified: remote Rpc client disassociated
15/07/07 10:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 1
from TaskSet 0.0
15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver 
terminated

or disconnected! Shutting down. cruncher05.stratified:32976
15/07/07 10:47:04 WARN remote.ReliableDeliverySupervisor: Association 
with

remote system [akka.tcp://sparkExecutor@cruncher05.stratified:32976] has
failed, address is now gated for [5000] ms. Reason is: [Disassociated].

15/07/07 10:47:04 WARN scheduler.TaskSetManager: Lost task 14591.0 in 
stage

0.0 (TID 14591, cruncher05.stratified): ExecutorLostFailure (executor 1
lost)

gc log for driver, it doesnt look like it run outofmem:

2015-07-07T10:45:19.887+0100: [GC (Allocation Failure)
1764131K-1391211K(3393024K), 0.0102839 secs]
2015-07-07T10:46:00.934+0100: [GC (Allocation Failure)
1764971K-1391867K(3405312K), 0.0099062 secs]
2015-07-07T10:46:45.252+0100: [GC (Allocation Failure)
1782011K-1392596K(3401216K), 0.0167572 secs]

executor:

15/07/07 10:47:03 INFO executor.Executor: Running task 14750.0 in 
stage 0.0

(TID 14750)
15/07/07 10:47:03 INFO spark.CacheManager: Partition rdd_493_14750 not
found, computing it
15/07/07 10:47:03 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
SIGNAL 15: SIGTERM
15/07/07 10:47:03 INFO storage.DiskBlockManager: Shutdown hook called

executor gc log (no outofmem as it seems):
2015-07-07T10:47:02.332+0100: [GC (GCLocker Initiated GC)
24696750K-23712939K(33523712K), 0.0416640 secs]
2015-07-07T10:47:02.598+0100: [GC (GCLocker Initiated GC)
24700520K-23722043K(33523712K), 0.0391156 secs]
2015-07-07T10:47:02.862+0100: [GC (Allocation Failure)
24709182K-23726510K(33518592K), 0.0390784 secs]





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RECEIVED-SIGNAL-15-SIGTERM-tp23668.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com http://Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
mailto:user-h...@spark.apache.org








Re: RECEIVED SIGNAL 15: SIGTERM

2015-07-13 Thread Konstantinos Kougios
I do have other non-xml tasks and I was getting the same SIGTERM on all 
of them. I think the issue might be due to me processing small files via 
binaryFiles or wholeTextFiles. Initially I had issues with Xmx memory 
because I got more than 1 mil files (and in 1 occasion it is 5 mil 
files). I sorted that out by processing them in batches of 32k. But then 
this started happening. I've set the memoryOverhead to 4g for most of 
the tasks and it is ok now. But 4g is too much for tasks that process 
small files. I do have 32 threads per executor on some tasks but 32meg 
for stack  thread overhead should do. Maybe the issue is sockets or 
some mem leak of network communication.


On 13/07/15 09:15, Ewan Higgs wrote:

It depends on how large the xml files are and how you're processing them.

If you're using !ENTITY tags then you don't need a very large piece of 
xml to consume a lot of memory. e.g. the billion laughs xml:

https://en.wikipedia.org/wiki/Billion_laughs

-Ewan

On 13/07/15 10:11, Konstantinos Kougios wrote:
it was the memoryOverhead. It runs ok with more of that, but do you 
know which libraries could affect this? I find it strange that it 
needs 4g for a task that processes some xml files. The task themselfs 
require less Xmx.


Cheers

On 13/07/15 06:29, Jong Wook Kim wrote:

Based on my experience, YARN containers can get SIGTERM when

- it produces too much logs and use up the hard drive
- it uses off-heap memory more than what is given by 
spark.yarn.executor.memoryOverhead configuration. It might be due to 
too many classes loaded (less than MaxPermGen but more than 
memoryOverhead), or some other off-heap memory allocated by 
networking library, etc.
- it opens too many file descriptors, which you can check on the 
executor node's /proc/executor jvm's pid/fd/


Does any of these apply to your situation?

Jong Wook

On Jul 7, 2015, at 19:16, Kostas Kougios 
kostas.koug...@googlemail.com wrote:


I am still receiving these weird sigterms on the executors. The 
driver claims

it lost the executor, the executor receives a SIGTERM (from whom???)

It doesn't seem a memory related issue though increasing memory 
takes the
job a bit further or completes it. But why? there is no memory 
pressure on

neither driver nor executor. And nothing in the logs indicating so.

driver:

15/07/07 10:47:04 INFO scheduler.TaskSetManager: Starting task 
14762.0 in
stage 0.0 (TID 14762, cruncher03.stratified, PROCESS_LOCAL, 13069 
bytes)
15/07/07 10:47:04 INFO scheduler.TaskSetManager: Finished task 
14517.0 in
stage 0.0 (TID 14517) in 15950 ms on cruncher03.stratified 
(14507/42240)
15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver 
terminated

or disconnected! Shutting down. cruncher05.stratified:32976
15/07/07 10:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 
1 on

cruncher05.stratified: remote Rpc client disassociated
15/07/07 10:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks 
for 1

from TaskSet 0.0
15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver 
terminated

or disconnected! Shutting down. cruncher05.stratified:32976
15/07/07 10:47:04 WARN remote.ReliableDeliverySupervisor: 
Association with
remote system 
[akka.tcp://sparkExecutor@cruncher05.stratified:32976] has

failed, address is now gated for [5000] ms. Reason is: [Disassociated].

15/07/07 10:47:04 WARN scheduler.TaskSetManager: Lost task 14591.0 
in stage

0.0 (TID 14591, cruncher05.stratified): ExecutorLostFailure (executor 1
lost)

gc log for driver, it doesnt look like it run outofmem:

2015-07-07T10:45:19.887+0100: [GC (Allocation Failure)
1764131K-1391211K(3393024K), 0.0102839 secs]
2015-07-07T10:46:00.934+0100: [GC (Allocation Failure)
1764971K-1391867K(3405312K), 0.0099062 secs]
2015-07-07T10:46:45.252+0100: [GC (Allocation Failure)
1782011K-1392596K(3401216K), 0.0167572 secs]

executor:

15/07/07 10:47:03 INFO executor.Executor: Running task 14750.0 in 
stage 0.0

(TID 14750)
15/07/07 10:47:03 INFO spark.CacheManager: Partition rdd_493_14750 not
found, computing it
15/07/07 10:47:03 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
SIGNAL 15: SIGTERM
15/07/07 10:47:03 INFO storage.DiskBlockManager: Shutdown hook called

executor gc log (no outofmem as it seems):
2015-07-07T10:47:02.332+0100: [GC (GCLocker Initiated GC)
24696750K-23712939K(33523712K), 0.0416640 secs]
2015-07-07T10:47:02.598+0100: [GC (GCLocker Initiated GC)
24700520K-23722043K(33523712K), 0.0391156 secs]
2015-07-07T10:47:02.862+0100: [GC (Allocation Failure)
24709182K-23726510K(33518592K), 0.0390784 secs]





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RECEIVED-SIGNAL-15-SIGTERM-tp23668.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com http://Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
mailto:user-unsubscr...@spark.apache.org

For additional commands, e-mail

Re: is it possible to disable -XX:OnOutOfMemoryError=kill %p for the executors?

2015-07-08 Thread Konstantinos Kougios

seems you're correct:

2015-07-07 17:21:27,245 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: 
Container [pid=38506,containerID=container_1436262805092_0022_01_03] 
is running be
yond virtual memory limits. Current usage: 4.3 GB of 4.5 GB physical 
memory used; 9.5 GB of 9.4 GB virtual memory used. Killing container.




On 07/07/15 18:28, Marcelo Vanzin wrote:
SIGTERM on YARN generally means the NM is killing your executor 
because it's running over its requested memory limits. Check your NM 
logs to make sure. And then take a look at the memoryOverhead 
setting for driver and executors 
(http://spark.apache.org/docs/latest/running-on-yarn.html).


On Tue, Jul 7, 2015 at 7:43 AM, Kostas Kougios 
kostas.koug...@googlemail.com mailto:kostas.koug...@googlemail.com 
wrote:


I've recompiled spark deleting the -XX:OnOutOfMemoryError=kill
declaration,
but still I am getting a SIGTERM!



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/is-it-possible-to-disable-XX-OnOutOfMemoryError-kill-p-for-the-executors-tp23680p23687.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org




--
Marcelo




binaryFiles() for 1 million files, too much memory required

2015-07-01 Thread Konstantinos Kougios

Once again I am trying to read a directory tree using binary files.

My directory tree has a root dir ROOTDIR and subdirs where the files are 
located, i.e.


ROOTDIR/1
ROOTDIR/2
ROOTDIR/..
ROOTDIR/100

A total of 1 mil files split into 100 sub dirs

Using binaryFiles requires too much memory on the driver. I've also 
tried rdds of binaryFiles(each subdir) and then ++ those and 
rdd.saveAsObjectFile(outputDir). That causes a lot of memory to be 
required in the executors!


What is the proper way to use binaryFiles with this number of files?

Thanks





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark uses too much memory maybe (binaryFiles() with more than 1 million files in HDFS), groupBy or reduceByKey()

2015-06-11 Thread Konstantinos Kougios

Hi Marchelo,

The collected data are collected in say class C. c.id is the id of each 
of those data. But that id might appear more than once in those 1mil xml 
files, so I am doing a reduceByKey(). Even if I had multiple binaryFile 
RDD's, wouldn't I have to ++ those in order to correctly reduceByKey()?


Also the executor is now configured to use 64GB and it run overnight, 
failing at 2am when it was using something between 30-50GB of RAM. I 
don't think my data are even close to that figure but I wasn't able to 
profile memory (will do today) and see what was consuming so much of it.


Cheers


On 10/06/15 17:14, Marcelo Vanzin wrote:

So, I don't have an explicit solution to your problem, but...

On Wed, Jun 10, 2015 at 7:13 AM, Kostas Kougios 
kostas.koug...@googlemail.com mailto:kostas.koug...@googlemail.com 
wrote:


I am profiling the driver. It currently has 564MB of strings which
might be
the 1mil file names. But also it has 2.34 GB of long[] ! That's so
far, it
is still running. What are those long[] used for?


When Spark lists files it also needs all the extra metadata about 
where the files are in the HDFS cluster. That is a lot more than just 
the file's name - see the LocatedFileStatus class in the Hadoop docs 
for an idea.


What you could try is to somehow break that input down into smaller 
batches, if that's feasible for your app. e.g. organize the files by 
directory and use separate directories in different calls to 
binaryFiles(), things like that.


--
Marcelo




Re: spark uses too much memory maybe (binaryFiles() with more than 1 million files in HDFS), groupBy or reduceByKey()

2015-06-11 Thread Konstantinos Kougios

Now I am profiling the executor.

There seems to be a memory leak.

20 mins after the run there were:

 157k byte[] allocated for 75MB.
519k java.lang.ref.Finalizer for 31MB,
291k java.util.zip.Inflater for  17MB
487k java.util.zip.ZStreamRef for 11MB

An hour after the run I got :

186k byte[] for 106MB
863k Finalizer for 52MB
475k Inflater for 29MB
354k Deflater for 24MB
829k ZStreamRef for 19MB

I don't see why those zip classes are leaking. I am not doing any 
compression myself (I am reading plain text xml files, extracting few 
elements and reducing them), I assume it must be the hadoop streams 
maybe when I do rdd.saveAsObjectFile()



I am using hadoop 2.7.0 with spark 1.3.1-hadoop

Cheers

On 10/06/15 17:14, Marcelo Vanzin wrote:

So, I don't have an explicit solution to your problem, but...

On Wed, Jun 10, 2015 at 7:13 AM, Kostas Kougios 
kostas.koug...@googlemail.com mailto:kostas.koug...@googlemail.com 
wrote:


I am profiling the driver. It currently has 564MB of strings which
might be
the 1mil file names. But also it has 2.34 GB of long[] ! That's so
far, it
is still running. What are those long[] used for?


When Spark lists files it also needs all the extra metadata about 
where the files are in the HDFS cluster. That is a lot more than just 
the file's name - see the LocatedFileStatus class in the Hadoop docs 
for an idea.


What you could try is to somehow break that input down into smaller 
batches, if that's feasible for your app. e.g. organize the files by 
directory and use separate directories in different calls to 
binaryFiles(), things like that.


--
Marcelo




Re: spark uses too much memory maybe (binaryFiles() with more than 1 million files in HDFS), groupBy or reduceByKey()

2015-06-11 Thread Konstantinos Kougios

..and keeps on increasing.

maybe there is a bug in some code that zips/unzips data.

109k instances of byte[] followed by 1 mil instances of Finalizer, with 
~500k Deflaters and ~500k Inflaters and 1 mil ZStreamRef


I assume that's due to either binaryFiles or saveAsObjectFile

On 11/06/15 13:01, Konstantinos Kougios wrote:

Now I am profiling the executor.

There seems to be a memory leak.

20 mins after the run there were:

 157k byte[] allocated for 75MB.
519k java.lang.ref.Finalizer for 31MB,
291k java.util.zip.Inflater for  17MB
487k java.util.zip.ZStreamRef for 11MB

An hour after the run I got :

186k byte[] for 106MB
863k Finalizer for 52MB
475k Inflater for 29MB
354k Deflater for 24MB
829k ZStreamRef for 19MB

I don't see why those zip classes are leaking. I am not doing any 
compression myself (I am reading plain text xml files, extracting few 
elements and reducing them), I assume it must be the hadoop streams 
maybe when I do rdd.saveAsObjectFile()



I am using hadoop 2.7.0 with spark 1.3.1-hadoop

Cheers

On 10/06/15 17:14, Marcelo Vanzin wrote:

So, I don't have an explicit solution to your problem, but...

On Wed, Jun 10, 2015 at 7:13 AM, Kostas Kougios 
kostas.koug...@googlemail.com 
mailto:kostas.koug...@googlemail.com wrote:


I am profiling the driver. It currently has 564MB of strings
which might be
the 1mil file names. But also it has 2.34 GB of long[] ! That's
so far, it
is still running. What are those long[] used for?


When Spark lists files it also needs all the extra metadata about 
where the files are in the HDFS cluster. That is a lot more than just 
the file's name - see the LocatedFileStatus class in the Hadoop 
docs for an idea.


What you could try is to somehow break that input down into smaller 
batches, if that's feasible for your app. e.g. organize the files by 
directory and use separate directories in different calls to 
binaryFiles(), things like that.


--
Marcelo






Re: spark uses too much memory maybe (binaryFiles() with more than 1 million files in HDFS), groupBy or reduceByKey()

2015-06-11 Thread Konstantinos Kougios


after 2h of running, now I got a 10GB long[], 1.3mil instances of long[]

So probably information about the files again.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark timesout maybe due to binaryFiles() with more than 1 million files in HDFS

2015-06-08 Thread Konstantinos Kougios
Thanks, did that and now I am getting an out of memory. But I am not 
sure where this occurs. It can't be on the spark executor as I have 28GB 
allocated to it. It is not the driver because I run this locally and 
monitor it via jvisualvm. Unfortunately I can't jmx-monitor hadoop.


From the stacktrace it seems it fails remotelly, after

at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)

Maybe at the namenode. Will try to increase it's memory.




java.io.IOException: com.google.protobuf.ServiceException: 
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:561)

at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

at com.sun.proxy.$Proxy10.getListing(Unknown Source)
*_at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)_*
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)

at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:292)
at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
at 
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:47)
at 
org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:43)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at 
org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)

at org.apache.spark.rdd.RDD.groupBy(RDD.scala:555)
at 
com.stratified.crossref.CitationExtractorJob$.extractCitations(CitationExtractorJob.scala:78)
at 
com.stratified.crossref.CitationExtractorJob$.execute(CitationExtractorJob.scala:32)
at 
com.stratified.crossref.CitationExtractorJob$.main(CitationExtractorJob.scala:20)
at 
com.stratified.crossref.CitationExtractorJob.main(CitationExtractorJob.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
Caused by: com.google.protobuf.ServiceException: 
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:274)

at com.sun.proxy.$Proxy9.getListing(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:554)

... 41 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.google.protobuf.ByteString.copyFrom(ByteString.java:192)
at 
com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324)
at 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.init(HdfsProtos.java:21261)
at 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.init(HdfsProtos.java:21172)
at 

Re: spark timesout maybe due to binaryFiles() with more than 1 million files in HDFS

2015-06-08 Thread Konstantinos Kougios
No luck I am afraid. After giving the namenode 16GB of RAM, I am still 
getting an out of mem exception, kind of different one:


15/06/08 15:35:52 ERROR yarn.ApplicationMaster: User class threw 
exception: GC overhead limit exceeded

java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1351)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1413)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1524)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1533)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:557)

at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

at com.sun.proxy.$Proxy10.getListing(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)

at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:292)
at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
at 
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:47)
at 
org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:43)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)


and on the 2nd retry of spark, a similar exception:

java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
com.google.protobuf.LiteralByteString.toString(LiteralByteString.java:148)

at com.google.protobuf.ByteString.toStringUtf8(ByteString.java:572)
at 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.getOwner(HdfsProtos.java:21558)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1413)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1524)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1533)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:557)

at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

at com.sun.proxy.$Proxy10.getListing(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)

at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
at 

Re: spark timesout maybe due to binaryFiles() with more than 1 million files in HDFS

2015-06-08 Thread Konstantinos Kougios
It was giving the same error, which made me figure out it is the driver 
but the driver running on hadoop - not the local one. So I did


--conf spark.driver.memory=8g

and now it is processing the files!

Cheers


On 08/06/15 15:52, Ewan Leith wrote:

Can you do a simple

sc.binaryFiles(hdfs:///path/to/files/*).count()

in the spark-shell and verify that part works?

Ewan



-Original Message-
From: Konstantinos Kougios [mailto:kostas.koug...@googlemail.com]
Sent: 08 June 2015 15:40
To: Ewan Leith; user@spark.apache.org
Subject: Re: spark timesout maybe due to binaryFiles() with more than 1 million 
files in HDFS

No luck I am afraid. After giving the namenode 16GB of RAM, I am still getting 
an out of mem exception, kind of different one:

15/06/08 15:35:52 ERROR yarn.ApplicationMaster: User class threw
exception: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1351)
  at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1413)
  at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1524)
  at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1533)
  at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:557)
  at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
  at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
  at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
  at com.sun.proxy.$Proxy10.getListing(Unknown Source)
  at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
  at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
  at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724)
  at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
  at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
  at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
  at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
  at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
  at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
  at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:292)
  at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
  at
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:47)
  at
org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:43)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
  at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)


and on the 2nd retry of spark, a similar exception:

java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
com.google.protobuf.LiteralByteString.toString(LiteralByteString.java:148)
  at com.google.protobuf.ByteString.toStringUtf8(ByteString.java:572)
  at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.getOwner(HdfsProtos.java:21558)
  at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1413)
  at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1524)
  at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1533)
  at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:557)
  at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
  at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
  at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
  at com.sun.proxy.$Proxy10.getListing(Unknown Source)
  at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
  at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java