akka disassociated on GC

2014-07-15 Thread Makoto Yui

Hello,

(2014/06/19 23:43), Xiangrui Meng wrote:

The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ 
records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the sequential 
aggregation of dense vectors on a single driver node.

It took about 7.6m for aggregation for an iteration.


When running the above test, I got another error at the beginning of the 
2nd iteration when enabling iterations.


It works fine for the first iteration but the 2nd iteration always fails.

It seems that akka connections are suddenly disassociated when GC 
happens on the driver node. Two possible causes can be considered:
1) The driver is under a heavy load because of GC; so executors cannot 
connect to the driver. Changing akka timeout setting did not resolve the 
issue.

2) akka oddly released valid connections on GC.

I'm using spark 1.0.1 and timeout setting of akka as follows did not 
resolve the problem.


[spark-defaults.conf]
spark.akka.frameSize 50
spark.akka.timeout   120
spark.akka.askTimeout120
spark.akka.lookupTimeout 120
spark.akka.heartbeat.pauses 600

It seems this issue is related to one previously discussed in
http://markmail.org/message/p2i34frtf4iusdfn

Are there any preferred configurations or workaround for this issue?

Thanks,
Makoto


[The error log of the driver]

14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 
as 25300254 bytes in 35 ms
666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)] 
12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 
sys=68.43, real=5.22 secs]
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated: 
app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding 
SendingConnectionManagerId not found
14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor 
app-20140714180032-0010/8 removed: Command exited with code 1
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding 
SendingConnectionManagerId not found
672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)] 
13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 
sys=33.72, real=2.83 secs]
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc02.mydomain.org,54538)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100)


The full log is uploaded on
https://dl.dropboxusercontent.com/u/13123103/driver.log


[The error log of a worker]
14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8 
finished with state EXITED message Command exited with code 1 exitStatus 1
14/07/14 18:11:38 INFO actor.LocalActorRef: Message 
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] 
from Actor[akka://sparkWorker/deadLetters] to 
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303] 
was not delivered. [13] dead letters encountered. This logging can be 
turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkwor...@dc09.mydomain.org:39578] -> 
[akka.tcp://sparkexecu...@dc09.mydomain.org:33886]: Error [Association 
failed with [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkexecu...@dc09.mydomain.org:33886]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: dc09.mydomain.org/10.0.1.9:33886]
14/07/14 18:11:38 INFO worker.Worker: Asked to launch executor 
app-20140714180032-0010/32 for Spark shell
14/07/14 18:11:38 WARN worker.Comman

Re: akka disassociated on GC

2014-07-15 Thread Xiangrui Meng
Hi Makoto,

I don't remember I wrote that but thanks for bringing this issue up!
There are two important settings to check: 1) driver memory (you can
see it from the executor tab), 2) number of partitions (try to use
small number of partitions). I put two PRs to fix the problem:

1) use broadcast in task closure: https://github.com/apache/spark/pull/1427
2) use treeAggregate to get the result:
https://github.com/apache/spark/pull/1110

They are still under review. Once merged, the problem should be fixed.
I will test the KDDB dataset and report back. Thanks!

Best,
Xiangrui

On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui  wrote:
> Hello,
>
> (2014/06/19 23:43), Xiangrui Meng wrote:
>>>
>>> The execution was slow for more large KDD cup 2012, Track 2 dataset
>>> (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the
>>> sequential aggregation of dense vectors on a single driver node.
>>>
>>> It took about 7.6m for aggregation for an iteration.
>
>
> When running the above test, I got another error at the beginning of the 2nd
> iteration when enabling iterations.
>
> It works fine for the first iteration but the 2nd iteration always fails.
>
> It seems that akka connections are suddenly disassociated when GC happens on
> the driver node. Two possible causes can be considered:
> 1) The driver is under a heavy load because of GC; so executors cannot
> connect to the driver. Changing akka timeout setting did not resolve the
> issue.
> 2) akka oddly released valid connections on GC.
>
> I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve
> the problem.
>
> [spark-defaults.conf]
> spark.akka.frameSize 50
> spark.akka.timeout   120
> spark.akka.askTimeout120
> spark.akka.lookupTimeout 120
> spark.akka.heartbeat.pauses 600
>
> It seems this issue is related to one previously discussed in
> http://markmail.org/message/p2i34frtf4iusdfn
>
> Are there any preferred configurations or workaround for this issue?
>
> Thanks,
> Makoto
>
> 
> [The error log of the driver]
>
> 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as
> 25300254 bytes in 35 ms
> 666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)]
> 12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43,
> real=5.22 secs]
> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc09.mydomain.org,34565)
> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
> ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
> 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
> app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
> SendingConnectionManagerId not found
> 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
> app-20140714180032-0010/8 removed: Command exited with code 1
> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc30.mydomain.org,59016)
> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
> ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
> SendingConnectionManagerId not found
> 672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)]
> 13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72,
> real=2.83 secs]
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
> ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc03.mydomain.org,43278)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc02.mydomain.org,54538)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
> ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc18.mydomain.org,58100)
> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(dc18.mydomain.org,58100)
>
> The full log is uploaded on
> https://dl.dropboxusercontent.com/u/13123103/driver.log
>
> 
> [The error log of a worker]
> 14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
> finished with state EXITED message Command exited with code 1 exitStatus 1
> 14/07/14 18:11:38 INFO actor.LocalActorRef: Message
> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
> Actor[akka://sparkWorker/deadLetters] to
> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303]
> was not delivered. [13] dead letters encountered. This logging can b

Re: akka disassociated on GC

2014-07-16 Thread Makoto Yui

Hi Xiangrui,

(2014/07/16 15:05), Xiangrui Meng wrote:

I don't remember I wrote that but thanks for bringing this issue up!
There are two important settings to check: 1) driver memory (you can
see it from the executor tab), 2) number of partitions (try to use
small number of partitions). I put two PRs to fix the problem:


For the driver memory, I used 16GB/24GB and it was enough for the 
execution (full GC was not happen). I check it by using jmap and top 
command.


BTW, I was faced that the required memory for driver was oddly 
proportional to # of tasks/executors. When I used 8GB for the driver 
memory, I got OOM in the task serialization. It could be considered as a 
possible memory leak in the task serialization to be addressed in the 
future.


Each task size is about 24MB and # of tasks/executors is 280.
The size of each task result was about 120MB or so.

> 1) use broadcast in task closure: 
https://github.com/apache/spark/pull/1427


Does this PR reduce the required memory for the driver?

Is there a big difference in explicit broadcast of feature weights and 
implicit task serialization including feature weights?


> 2) use treeAggregate to get the result:
> https://github.com/apache/spark/pull/1110

treeAggregate would reduce the time for aggregation and the required 
memory of a driver for sure. I would test it.


However, the problem that I am facing now is an akka connection issue on 
GC, or under heavy loads. And thus, I think the problem is lurking 
behind even though the consumed memory size is reduced by treeAggregate.


Best,
Makoto


Re: akka disassociated on GC

2014-07-22 Thread Makoto Yui

Hi Xiangrui,

By using your treeAggregate and broadcast patch, the evaluation has been 
processed successfully.


I expect that these patches are merged in the next major release 
(v1.1?). Without them, it would be hard to use mllib for a large dataset.


Thanks,
Makoto

(2014/07/16 15:05), Xiangrui Meng wrote:

Hi Makoto,

I don't remember I wrote that but thanks for bringing this issue up!
There are two important settings to check: 1) driver memory (you can
see it from the executor tab), 2) number of partitions (try to use
small number of partitions). I put two PRs to fix the problem:

1) use broadcast in task closure: https://github.com/apache/spark/pull/1427
2) use treeAggregate to get the result:
https://github.com/apache/spark/pull/1110

They are still under review. Once merged, the problem should be fixed.
I will test the KDDB dataset and report back. Thanks!

Best,
Xiangrui

On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui  wrote:

Hello,

(2014/06/19 23:43), Xiangrui Meng wrote:


The execution was slow for more large KDD cup 2012, Track 2 dataset
(235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the
sequential aggregation of dense vectors on a single driver node.

It took about 7.6m for aggregation for an iteration.



When running the above test, I got another error at the beginning of the 2nd
iteration when enabling iterations.

It works fine for the first iteration but the 2nd iteration always fails.

It seems that akka connections are suddenly disassociated when GC happens on
the driver node. Two possible causes can be considered:
1) The driver is under a heavy load because of GC; so executors cannot
connect to the driver. Changing akka timeout setting did not resolve the
issue.
2) akka oddly released valid connections on GC.

I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve
the problem.

[spark-defaults.conf]
spark.akka.frameSize 50
spark.akka.timeout   120
spark.akka.askTimeout120
spark.akka.lookupTimeout 120
spark.akka.heartbeat.pauses 600

It seems this issue is related to one previously discussed in
http://markmail.org/message/p2i34frtf4iusdfn

Are there any preferred configurations or workaround for this issue?

Thanks,
Makoto


[The error log of the driver]

14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as
25300254 bytes in 35 ms
666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)]
12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43,
real=5.22 secs]
14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20140714180032-0010/8 removed: Command exited with code 1
14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)]
13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72,
real=2.83 secs]
14/07/14 18:11:41 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc02.mydomain.org,54538)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc18.mydomain.org,58100)

The full log is uploaded on
https://dl.dropboxusercontent.com/u/13123103/driver.log


[The error log of a worker]
14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
finished with state EXITED message Command exited with code 1 exitStatus 1
14/07/14 18:11:38 INFO actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akk

Re: akka disassociated on GC

2014-07-23 Thread Xiangrui Meng
Yes, that's the plan. If you use broadcast, please also make sure
TorrentBroadcastFactory is used, which became the default broadcast
factory very recently. -Xiangrui

On Tue, Jul 22, 2014 at 10:47 PM, Makoto Yui  wrote:
> Hi Xiangrui,
>
> By using your treeAggregate and broadcast patch, the evaluation has been
> processed successfully.
>
> I expect that these patches are merged in the next major release (v1.1?).
> Without them, it would be hard to use mllib for a large dataset.
>
> Thanks,
> Makoto
>
>
> (2014/07/16 15:05), Xiangrui Meng wrote:
>>
>> Hi Makoto,
>>
>> I don't remember I wrote that but thanks for bringing this issue up!
>> There are two important settings to check: 1) driver memory (you can
>> see it from the executor tab), 2) number of partitions (try to use
>> small number of partitions). I put two PRs to fix the problem:
>>
>> 1) use broadcast in task closure:
>> https://github.com/apache/spark/pull/1427
>> 2) use treeAggregate to get the result:
>> https://github.com/apache/spark/pull/1110
>>
>> They are still under review. Once merged, the problem should be fixed.
>> I will test the KDDB dataset and report back. Thanks!
>>
>> Best,
>> Xiangrui
>>
>> On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui  wrote:
>>>
>>> Hello,
>>>
>>> (2014/06/19 23:43), Xiangrui Meng wrote:
>
>
> The execution was slow for more large KDD cup 2012, Track 2 dataset
> (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to
> the
> sequential aggregation of dense vectors on a single driver node.
>
> It took about 7.6m for aggregation for an iteration.
>>>
>>>
>>>
>>> When running the above test, I got another error at the beginning of the
>>> 2nd
>>> iteration when enabling iterations.
>>>
>>> It works fine for the first iteration but the 2nd iteration always fails.
>>>
>>> It seems that akka connections are suddenly disassociated when GC happens
>>> on
>>> the driver node. Two possible causes can be considered:
>>> 1) The driver is under a heavy load because of GC; so executors cannot
>>> connect to the driver. Changing akka timeout setting did not resolve the
>>> issue.
>>> 2) akka oddly released valid connections on GC.
>>>
>>> I'm using spark 1.0.1 and timeout setting of akka as follows did not
>>> resolve
>>> the problem.
>>>
>>> [spark-defaults.conf]
>>> spark.akka.frameSize 50
>>> spark.akka.timeout   120
>>> spark.akka.askTimeout120
>>> spark.akka.lookupTimeout 120
>>> spark.akka.heartbeat.pauses 600
>>>
>>> It seems this issue is related to one previously discussed in
>>> http://markmail.org/message/p2i34frtf4iusdfn
>>>
>>> Are there any preferred configurations or workaround for this issue?
>>>
>>> Thanks,
>>> Makoto
>>>
>>> 
>>> [The error log of the driver]
>>>
>>> 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117
>>> as
>>> 25300254 bytes in 35 ms
>>> 666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)]
>>> 12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00
>>> sys=68.43,
>>> real=5.22 secs]
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc09.mydomain.org,34565)
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
>>> 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
>>> app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
>>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
>>> SendingConnectionManagerId not found
>>> 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
>>> app-20140714180032-0010/8 removed: Command exited with code 1
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc30.mydomain.org,59016)
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
>>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
>>> SendingConnectionManagerId not found
>>> 672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)]
>>> 13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83
>>> sys=33.72,
>>> real=2.83 secs]
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc03.mydomain.org,43278)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc02.mydomain.org,54538)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc18.mydomain.org,58100)
>>> 14/