Yes, that's the plan. If you use broadcast, please also make sure TorrentBroadcastFactory is used, which became the default broadcast factory very recently. -Xiangrui
On Tue, Jul 22, 2014 at 10:47 PM, Makoto Yui <yuin...@gmail.com> wrote: > Hi Xiangrui, > > By using your treeAggregate and broadcast patch, the evaluation has been > processed successfully. > > I expect that these patches are merged in the next major release (v1.1?). > Without them, it would be hard to use mllib for a large dataset. > > Thanks, > Makoto > > > (2014/07/16 15:05), Xiangrui Meng wrote: >> >> Hi Makoto, >> >> I don't remember I wrote that but thanks for bringing this issue up! >> There are two important settings to check: 1) driver memory (you can >> see it from the executor tab), 2) number of partitions (try to use >> small number of partitions). I put two PRs to fix the problem: >> >> 1) use broadcast in task closure: >> https://github.com/apache/spark/pull/1427 >> 2) use treeAggregate to get the result: >> https://github.com/apache/spark/pull/1110 >> >> They are still under review. Once merged, the problem should be fixed. >> I will test the KDDB dataset and report back. Thanks! >> >> Best, >> Xiangrui >> >> On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui <yuin...@gmail.com> wrote: >>> >>> Hello, >>> >>> (2014/06/19 23:43), Xiangrui Meng wrote: >>>>> >>>>> >>>>> The execution was slow for more large KDD cup 2012, Track 2 dataset >>>>> (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to >>>>> the >>>>> sequential aggregation of dense vectors on a single driver node. >>>>> >>>>> It took about 7.6m for aggregation for an iteration. >>> >>> >>> >>> When running the above test, I got another error at the beginning of the >>> 2nd >>> iteration when enabling iterations. >>> >>> It works fine for the first iteration but the 2nd iteration always fails. >>> >>> It seems that akka connections are suddenly disassociated when GC happens >>> on >>> the driver node. Two possible causes can be considered: >>> 1) The driver is under a heavy load because of GC; so executors cannot >>> connect to the driver. Changing akka timeout setting did not resolve the >>> issue. >>> 2) akka oddly released valid connections on GC. >>> >>> I'm using spark 1.0.1 and timeout setting of akka as follows did not >>> resolve >>> the problem. >>> >>> [spark-defaults.conf] >>> spark.akka.frameSize 50 >>> spark.akka.timeout 120 >>> spark.akka.askTimeout 120 >>> spark.akka.lookupTimeout 120 >>> spark.akka.heartbeat.pauses 600 >>> >>> It seems this issue is related to one previously discussed in >>> http://markmail.org/message/p2i34frtf4iusdfn >>> >>> Are there any preferred configurations or workaround for this issue? >>> >>> Thanks, >>> Makoto >>> >>> -------------------------------------------- >>> [The error log of the driver] >>> >>> 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 >>> as >>> 25300254 bytes in 35 ms >>> 666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)] >>> 12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 >>> sys=68.43, >>> real=5.22 secs] >>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing >>> SendingConnection >>> to ConnectionManagerId(dc09.mydomain.org,34565) >>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing >>> ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565) >>> 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated: >>> app-20140714180032-0010/8 is now EXITED (Command exited with code 1) >>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding >>> SendingConnectionManagerId not found >>> 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor >>> app-20140714180032-0010/8 removed: Command exited with code 1 >>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing >>> SendingConnection >>> to ConnectionManagerId(dc30.mydomain.org,59016) >>> 14/07/14 18:11:38 INFO network.ConnectionManager: Removing >>> ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016) >>> 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding >>> SendingConnectionManagerId not found >>> 672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)] >>> 13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 >>> sys=33.72, >>> real=2.83 secs] >>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing >>> ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278) >>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing >>> SendingConnection >>> to ConnectionManagerId(dc03.mydomain.org,43278) >>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing >>> SendingConnection >>> to ConnectionManagerId(dc02.mydomain.org,54538) >>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing >>> ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100) >>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing >>> SendingConnection >>> to ConnectionManagerId(dc18.mydomain.org,58100) >>> 14/07/14 18:11:41 INFO network.ConnectionManager: Removing >>> SendingConnection >>> to ConnectionManagerId(dc18.mydomain.org,58100) >>> >>> The full log is uploaded on >>> https://dl.dropboxusercontent.com/u/13123103/driver.log >>> >>> -------------------------------------------- >>> [The error log of a worker] >>> 14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8 >>> finished with state EXITED message Command exited with code 1 exitStatus >>> 1 >>> 14/07/14 18:11:38 INFO actor.LocalActorRef: Message >>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from >>> Actor[akka://sparkWorker/deadLetters] to >>> >>> Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303] >>> was not delivered. [13] dead letters encountered. This logging can be >>> turned >>> off or adjusted with configuration settings 'akka.log-dead-letters' and >>> 'akka.log-dead-letters-during-shutdown'. >>> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError >>> [akka.tcp://sparkwor...@dc09.mydomain.org:39578] -> >>> [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]: Error [Association >>> failed with [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]] [ >>> akka.remote.EndpointAssociationException: Association failed with >>> [akka.tcp://sparkexecu...@dc09.mydomain.org:33886] >>> Caused by: >>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: >>> Connection refused: dc09.mydomain.org/10.0.1.9:33886] >>> 14/07/14 18:11:38 INFO worker.Worker: Asked to launch executor >>> app-20140714180032-0010/32 for Spark shell >>> 14/07/14 18:11:38 WARN worker.CommandUtils: SPARK_JAVA_OPTS was set on >>> the >>> worker. It is deprecated in Spark 1.0. >>> 14/07/14 18:11:38 WARN worker.CommandUtils: Set SPARK_LOCAL_DIRS for >>> node-specific storage locations. >>> 14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError >>> [akka.tcp://sparkwor...@dc09.mydomain.org:39578] -> >>> [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]: Error [Association >>> failed with [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]] [ >>> akka.remote.EndpointAssociationException: Association failed with >>> [akka.tcp://sparkexecu...@dc09.mydomain.org:33886] >>> Caused by: >>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: >>> Connection refused: dc09.mydomain.org/10.0.1.9:33886] >> >> >