Yes, that's the plan. If you use broadcast, please also make sure
TorrentBroadcastFactory is used, which became the default broadcast
factory very recently. -Xiangrui

On Tue, Jul 22, 2014 at 10:47 PM, Makoto Yui <yuin...@gmail.com> wrote:
> Hi Xiangrui,
>
> By using your treeAggregate and broadcast patch, the evaluation has been
> processed successfully.
>
> I expect that these patches are merged in the next major release (v1.1?).
> Without them, it would be hard to use mllib for a large dataset.
>
> Thanks,
> Makoto
>
>
> (2014/07/16 15:05), Xiangrui Meng wrote:
>>
>> Hi Makoto,
>>
>> I don't remember I wrote that but thanks for bringing this issue up!
>> There are two important settings to check: 1) driver memory (you can
>> see it from the executor tab), 2) number of partitions (try to use
>> small number of partitions). I put two PRs to fix the problem:
>>
>> 1) use broadcast in task closure:
>> https://github.com/apache/spark/pull/1427
>> 2) use treeAggregate to get the result:
>> https://github.com/apache/spark/pull/1110
>>
>> They are still under review. Once merged, the problem should be fixed.
>> I will test the KDDB dataset and report back. Thanks!
>>
>> Best,
>> Xiangrui
>>
>> On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui <yuin...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> (2014/06/19 23:43), Xiangrui Meng wrote:
>>>>>
>>>>>
>>>>> The execution was slow for more large KDD cup 2012, Track 2 dataset
>>>>> (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to
>>>>> the
>>>>> sequential aggregation of dense vectors on a single driver node.
>>>>>
>>>>> It took about 7.6m for aggregation for an iteration.
>>>
>>>
>>>
>>> When running the above test, I got another error at the beginning of the
>>> 2nd
>>> iteration when enabling iterations.
>>>
>>> It works fine for the first iteration but the 2nd iteration always fails.
>>>
>>> It seems that akka connections are suddenly disassociated when GC happens
>>> on
>>> the driver node. Two possible causes can be considered:
>>> 1) The driver is under a heavy load because of GC; so executors cannot
>>> connect to the driver. Changing akka timeout setting did not resolve the
>>> issue.
>>> 2) akka oddly released valid connections on GC.
>>>
>>> I'm using spark 1.0.1 and timeout setting of akka as follows did not
>>> resolve
>>> the problem.
>>>
>>> [spark-defaults.conf]
>>> spark.akka.frameSize     50
>>> spark.akka.timeout       120
>>> spark.akka.askTimeout    120
>>> spark.akka.lookupTimeout 120
>>> spark.akka.heartbeat.pauses     600
>>>
>>> It seems this issue is related to one previously discussed in
>>> http://markmail.org/message/p2i34frtf4iusdfn
>>>
>>> Are there any preferred configurations or workaround for this issue?
>>>
>>> Thanks,
>>> Makoto
>>>
>>> --------------------------------------------
>>> [The error log of the driver]
>>>
>>> 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117
>>> as
>>> 25300254 bytes in 35 ms
>>> 666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)]
>>> 12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00
>>> sys=68.43,
>>> real=5.22 secs]
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc09.mydomain.org,34565)
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
>>> 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
>>> app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
>>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
>>> SendingConnectionManagerId not found
>>> 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
>>> app-20140714180032-0010/8 removed: Command exited with code 1
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc30.mydomain.org,59016)
>>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
>>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
>>> SendingConnectionManagerId not found
>>> 672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)]
>>> 13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83
>>> sys=33.72,
>>> real=2.83 secs]
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc03.mydomain.org,43278)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc02.mydomain.org,54538)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc18.mydomain.org,58100)
>>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing
>>> SendingConnection
>>> to ConnectionManagerId(dc18.mydomain.org,58100)
>>>
>>> The full log is uploaded on
>>> https://dl.dropboxusercontent.com/u/13123103/driver.log
>>>
>>> --------------------------------------------
>>> [The error log of a worker]
>>> 14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
>>> finished with state EXITED message Command exited with code 1 exitStatus
>>> 1
>>> 14/07/14 18:11:38 INFO actor.LocalActorRef: Message
>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
>>> Actor[akka://sparkWorker/deadLetters] to
>>>
>>> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303]
>>> was not delivered. [13] dead letters encountered. This logging can be
>>> turned
>>> off or adjusted with configuration settings 'akka.log-dead-letters' and
>>> 'akka.log-dead-letters-during-shutdown'.
>>> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError
>>> [akka.tcp://sparkwor...@dc09.mydomain.org:39578] ->
>>> [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]: Error [Association
>>> failed with [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]] [
>>> akka.remote.EndpointAssociationException: Association failed with
>>> [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]
>>> Caused by:
>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>> Connection refused: dc09.mydomain.org/10.0.1.9:33886]
>>> 14/07/14 18:11:38 INFO worker.Worker: Asked to launch executor
>>> app-20140714180032-0010/32 for Spark shell
>>> 14/07/14 18:11:38 WARN worker.CommandUtils: SPARK_JAVA_OPTS was set on
>>> the
>>> worker. It is deprecated in Spark 1.0.
>>> 14/07/14 18:11:38 WARN worker.CommandUtils: Set SPARK_LOCAL_DIRS for
>>> node-specific storage locations.
>>> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError
>>> [akka.tcp://sparkwor...@dc09.mydomain.org:39578] ->
>>> [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]: Error [Association
>>> failed with [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]] [
>>> akka.remote.EndpointAssociationException: Association failed with
>>> [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]
>>> Caused by:
>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>> Connection refused: dc09.mydomain.org/10.0.1.9:33886]
>>
>>
>

Reply via email to