It seems to me a setup issue. I just tested news20.binary (1355191 features) on a 2-node EC2 cluster and it worked well. I added one line to conf/spark-env.sh:
export SPARK_JAVA_OPTS=" -Dspark.akka.frameSize=20 " and launched spark-shell with "--driver-memory 20g". Could you re-try with an EC2 setup? If it still doesn't work, please attach all your code and logs. Best, Xiangrui On Sun, Jul 6, 2014 at 1:35 AM, Bharath Ravi Kumar <reachb...@gmail.com> wrote: > Hi Xiangrui, > > 1) Yes, I used the same build (compiled locally from source) to the host > that has (master, slave1) and the second host with slave2. > > 2) The execution was successful when run in local mode with reduced number > of partitions. Does this imply issues communicating/coordinating across > processes (i.e. driver, master and workers)? > > Thanks, > Bharath > > > > On Sun, Jul 6, 2014 at 11:37 AM, Xiangrui Meng <men...@gmail.com> wrote: >> >> Hi Bharath, >> >> 1) Did you sync the spark jar and conf to the worker nodes after build? >> 2) Since the dataset is not large, could you try local mode first >> using `spark-summit --driver-memory 12g --master local[*]`? >> 3) Try to use less number of partitions, say 5. >> >> If the problem is still there, please attach the full master/worker log >> files. >> >> Best, >> Xiangrui >> >> On Fri, Jul 4, 2014 at 12:16 AM, Bharath Ravi Kumar <reachb...@gmail.com> >> wrote: >> > Xiangrui, >> > >> > Leaving the frameSize unspecified led to an error message (and failure) >> > stating that the task size (~11M) was larger. I hence set it to an >> > arbitrarily large value ( I realize 500 was unrealistic & unnecessary in >> > this case). I've now set the size to 20M and repeated the runs. The >> > earlier >> > runs were on an uncached RDD. Caching the RDD (and setting >> > spark.storage.memoryFraction=0.5) resulted in marginal speed up of >> > execution, but the end result remained the same. The cached RDD size is >> > as >> > follows: >> > >> > RDD Name Storage Level Cached Partitions >> > Fraction Cached Size in Memory Size in Tachyon Size on Disk >> > 1084 Memory Deserialized 1x Replicated 80 >> > 100% 165.9 MB 0.0 B 0.0 B >> > >> > >> > >> > The corresponding master logs were: >> > >> > 14/07/04 06:29:34 INFO Master: Removing executor >> > app-20140704062238-0033/1 >> > because it is EXITED >> > 14/07/04 06:29:34 INFO Master: Launching executor >> > app-20140704062238-0033/2 >> > on worker worker-20140630124441-slave1-40182 >> > 14/07/04 06:29:34 INFO Master: Removing executor >> > app-20140704062238-0033/0 >> > because it is EXITED >> > 14/07/04 06:29:34 INFO Master: Launching executor >> > app-20140704062238-0033/3 >> > on worker worker-20140630102913-slave2-44735 >> > 14/07/04 06:29:37 INFO Master: Removing executor >> > app-20140704062238-0033/2 >> > because it is EXITED >> > 14/07/04 06:29:37 INFO Master: Launching executor >> > app-20140704062238-0033/4 >> > on worker worker-20140630124441-slave1-40182 >> > 14/07/04 06:29:37 INFO Master: Removing executor >> > app-20140704062238-0033/3 >> > because it is EXITED >> > 14/07/04 06:29:37 INFO Master: Launching executor >> > app-20140704062238-0033/5 >> > on worker worker-20140630102913-slave2-44735 >> > 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got >> > disassociated, removing it. >> > 14/07/04 06:29:39 INFO Master: Removing app app-20140704062238-0033 >> > 14/07/04 06:29:39 INFO LocalActorRef: Message >> > [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] >> > from >> > Actor[akka://sparkMaster/deadLetters] to >> > >> > Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.3.1.135%3A33061-123#1986674260] >> > was not delivered. [39] dead letters encountered. This logging can be >> > turned >> > off or adjusted with configuration settings 'akka.log-dead-letters' and >> > 'akka.log-dead-letters-during-shutdown'. >> > 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got >> > disassociated, removing it. >> > 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got >> > disassociated, removing it. >> > 14/07/04 06:29:39 ERROR EndpointWriter: AssociationError >> > [akka.tcp://sparkMaster@master:7077] -> [akka.tcp://spark@slave2:45172]: >> > Error [Association failed with [akka.tcp://spark@slave2:45172]] [ >> > akka.remote.EndpointAssociationException: Association failed with >> > [akka.tcp://spark@slave2:45172] >> > Caused by: >> > akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: >> > Connection refused: slave2/10.3.1.135:45172 >> > ] >> > 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got >> > disassociated, removing it. >> > 14/07/04 06:29:39 ERROR EndpointWriter: AssociationError >> > [akka.tcp://sparkMaster@master:7077] -> [akka.tcp://spark@slave2:45172]: >> > Error [Association failed with [akka.tcp://spark@slave2:45172]] [ >> > akka.remote.EndpointAssociationException: Association failed with >> > [akka.tcp://spark@slave2:45172] >> > Caused by: >> > akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: >> > Connection refused: slave2/10.3.1.135:45172 >> > ] >> > 14/07/04 06:29:39 ERROR EndpointWriter: AssociationError >> > [akka.tcp://sparkMaster@master:7077] -> [akka.tcp://spark@slave2:45172]: >> > Error [Association failed with [akka.tcp://spark@slave2:45172]] [ >> > akka.remote.EndpointAssociationException: Association failed with >> > [akka.tcp://spark@slave2:45172] >> > Caused by: >> > akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: >> > Connection refused: slave2/10.3.1.135:45172 >> > ] >> > 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got >> > disassociated, removing it. >> > 14/07/04 06:29:40 WARN Master: Got status update for unknown executor >> > app-20140704062238-0033/5 >> > 14/07/04 06:29:40 WARN Master: Got status update for unknown executor >> > app-20140704062238-0033/4 >> > >> > >> > Coincidentally, after the initial executor failed, each following >> > executor >> > that was re-spawned failed with the following logs: >> > (e.g the following was from >> > slave1:~/spark-1.0.1-rc1/work/app-20140704062238-0033/2/stderr) >> > >> > log4j:WARN No appenders could be found for logger >> > (org.apache.hadoop.conf.Configuration). >> > log4j:WARN Please initialize the log4j system properly. >> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for >> > more info. >> > 14/07/04 06:29:35 INFO SparkHadoopUtil: Using Spark's default log4j >> > profile: >> > org/apache/spark/log4j-defaults.properties >> > 14/07/04 06:29:35 INFO SecurityManager: Changing view acls to: user1 >> > 14/07/04 06:29:35 INFO SecurityManager: SecurityManager: authentication >> > disabled; ui acls disabled; users with view permissions: Set(user1) >> > 14/07/04 06:29:35 INFO Slf4jLogger: Slf4jLogger started >> > 14/07/04 06:29:35 INFO Remoting: Starting remoting >> > 14/07/04 06:29:36 INFO Remoting: Remoting started; listening on >> > addresses >> > :[akka.tcp://sparkExecutor@slave1:54782] >> > 14/07/04 06:29:36 INFO Remoting: Remoting now listens on addresses: >> > [akka.tcp://sparkExecutor@slave1:54782] >> > 14/07/04 06:29:36 INFO CoarseGrainedExecutorBackend: Connecting to >> > driver: >> > akka.tcp://spark@master:45172/user/CoarseGrainedScheduler >> > 14/07/04 06:29:36 INFO WorkerWatcher: Connecting to worker >> > akka.tcp://sparkWorker@slave1:40182/user/Worker >> > 14/07/04 06:29:36 INFO WorkerWatcher: Successfully connected to >> > akka.tcp://sparkWorker@slave1:40182/user/Worker >> > 14/07/04 06:29:36 INFO CoarseGrainedExecutorBackend: Successfully >> > registered >> > with driver >> > 14/07/04 06:29:36 INFO SecurityManager: Changing view acls to: user1 >> > 14/07/04 06:29:36 INFO SecurityManager: SecurityManager: authentication >> > disabled; ui acls disabled; users with view permissions: Set(user1) >> > 14/07/04 06:29:36 INFO Slf4jLogger: Slf4jLogger started >> > 14/07/04 06:29:36 INFO Remoting: Starting remoting >> > 14/07/04 06:29:36 INFO Remoting: Remoting started; listening on >> > addresses >> > :[akka.tcp://spark@slave1:39753] >> > 14/07/04 06:29:36 INFO SparkEnv: Connecting to MapOutputTracker: >> > akka.tcp://spark@master:45172/user/MapOutputTracker >> > 14/07/04 06:29:36 INFO SparkEnv: Connecting to BlockManagerMaster: >> > akka.tcp://spark@master:45172/user/BlockManagerMaster >> > 14/07/04 06:29:36 INFO DiskBlockManager: Created local directory at >> > /tmp/spark-local-20140704062936-6123 >> > 14/07/04 06:29:36 INFO MemoryStore: MemoryStore started with capacity >> > 6.7 >> > GB. >> > 14/07/04 06:29:36 INFO ConnectionManager: Bound socket to port 50960 >> > with id >> > = ConnectionManagerId(slave1,50960) >> > 14/07/04 06:29:36 INFO BlockManagerMaster: Trying to register >> > BlockManager >> > 14/07/04 06:29:36 INFO BlockManagerMaster: Registered BlockManager >> > 14/07/04 06:29:36 INFO HttpFileServer: HTTP File server directory is >> > /tmp/spark-42c2782f-60f8-45a7-9e11-c789fc87fe2e >> > 14/07/04 06:29:36 INFO HttpServer: Starting HTTP Server >> > 14/07/04 06:29:36 ERROR CoarseGrainedExecutorBackend: Driver >> > Disassociated >> > [akka.tcp://sparkExecutor@slave1:54782] -> >> > [akka.tcp://spark@master:45172] >> > disassociated! Shutting down. >> > >> > In case of the initial executor that successfully started, the >> > corresponding >> > log messages (from >> > spark-1.0.1-rc1/work/app-20140704062238-0033/1/stderr) on >> > the executor were: >> > log4j:WARN No appenders could be found for logger >> > (org.apache.hadoop.conf.Configuration). >> > log4j:WARN Please initialize the log4j system properly. >> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for >> > more info. >> > 14/07/04 06:22:39 INFO SparkHadoopUtil: Using Spark's default log4j >> > profile: >> > org/apache/spark/log4j-defaults.properties >> > 14/07/04 06:22:39 INFO SecurityManager: Changing view acls to: user1 >> > 14/07/04 06:22:39 INFO SecurityManager: SecurityManager: authentication >> > disabled; ui acls disabled; users with view permissions: Set(user1) >> > 14/07/04 06:22:39 INFO Slf4jLogger: Slf4jLogger started >> > 14/07/04 06:22:39 INFO Remoting: Starting remoting >> > 14/07/04 06:22:39 INFO Remoting: Remoting started; listening on >> > addresses >> > :[akka.tcp://sparkExecutor@slave1:50806] >> > 14/07/04 06:22:39 INFO Remoting: Remoting now listens on addresses: >> > [akka.tcp://sparkExecutor@slave1:50806] >> > 14/07/04 06:22:39 INFO CoarseGrainedExecutorBackend: Connecting to >> > driver: >> > akka.tcp://spark@master:45172/user/CoarseGrainedScheduler >> > 14/07/04 06:22:39 INFO WorkerWatcher: Connecting to worker >> > akka.tcp://sparkWorker@slave1:40182/user/Worker >> > 14/07/04 06:22:39 INFO WorkerWatcher: Successfully connected to >> > akka.tcp://sparkWorker@slave1:40182/user/Worker >> > 14/07/04 06:22:40 INFO CoarseGrainedExecutorBackend: Successfully >> > registered >> > with driver >> > 14/07/04 06:22:40 INFO SecurityManager: Changing view acls to: user1 >> > 14/07/04 06:22:40 INFO SecurityManager: SecurityManager: authentication >> > disabled; ui acls disabled; users with view permissions: Set(user1) >> > 14/07/04 06:22:40 INFO Slf4jLogger: Slf4jLogger started >> > 14/07/04 06:22:40 INFO Remoting: Starting remoting >> > 14/07/04 06:22:40 INFO Remoting: Remoting started; listening on >> > addresses >> > :[akka.tcp://spark@slave1:38558] >> > 14/07/04 06:22:40 INFO SparkEnv: Connecting to MapOutputTracker: >> > akka.tcp://spark@master:45172/user/MapOutputTracker >> > 14/07/04 06:22:40 INFO SparkEnv: Connecting to BlockManagerMaster: >> > akka.tcp://spark@master:45172/user/BlockManagerMaster >> > 14/07/04 06:22:40 INFO DiskBlockManager: Created local directory at >> > /tmp/spark-local-20140704062240-6a65 >> > 14/07/04 06:22:40 INFO MemoryStore: MemoryStore started with capacity >> > 6.7 >> > GB. >> > 14/07/04 06:22:40 INFO ConnectionManager: Bound socket to port 46901 >> > with id >> > = ConnectionManagerId(slave1,46901) >> > 14/07/04 06:22:40 INFO BlockManagerMaster: Trying to register >> > BlockManager >> > 14/07/04 06:22:40 INFO BlockManagerMaster: Registered BlockManager >> > 14/07/04 06:22:40 INFO HttpFileServer: HTTP File server directory is >> > /tmp/spark-9eba78f9-8ae9-477c-9338-7222ae6fe306 >> > 14/07/04 06:22:40 INFO HttpServer: Starting HTTP Server >> > 14/07/04 06:22:42 INFO CoarseGrainedExecutorBackend: Got assigned task 0 >> > 14/07/04 06:22:42 INFO Executor: Running task ID 0 >> > 14/07/04 06:22:42 INFO CoarseGrainedExecutorBackend: Got assigned task 2 >> > 14/07/04 06:22:42 INFO Executor: Running task ID 2 >> > ... >> > >> > >> > >> > On Fri, Jul 4, 2014 at 5:52 AM, Xiangrui Meng <men...@gmail.com> wrote: >> >> >> >> The feature dimension is small. You don't need a big akka.frameSize. >> >> The default one (10M) should be sufficient. Did you cache the data >> >> before calling LRWithSGD? -Xiangrui >> >> >> >> On Thu, Jul 3, 2014 at 10:02 AM, Bharath Ravi Kumar >> >> <reachb...@gmail.com> >> >> wrote: >> >> > I tried another run after setting the driver memory to 8G (and >> >> > spark.akka.frameSize = 500 on the executors and the driver). In >> >> > addition, I >> >> > also tried to reduce the amount of data that a single task processes, >> >> > by >> >> > increasing the number of partitions (of the labeled points) to 120 >> >> > (instead >> >> > of 2 used earlier), and then setting max cores to 2. That made no >> >> > difference >> >> > since, at the end of 120 tasks, the familiar error message appeared >> >> > on a >> >> > slave: >> >> > >> >> > <snipped earlier logs> >> >> > 14/07/03 16:18:48 INFO CoarseGrainedExecutorBackend: Got assigned >> >> > task >> >> > 1436 >> >> > 14/07/03 16:18:48 INFO Executor: Running task ID 1436 >> >> > 14/07/03 16:18:53 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00014:0+2215337 >> >> > 14/07/03 16:18:54 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00014:2215337+2215338 >> >> > 14/07/03 16:18:54 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00003:0+2196429 >> >> > 14/07/03 16:18:54 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00003:2196429+2196430 >> >> > 14/07/03 16:18:54 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00010:0+2186751 >> >> > 14/07/03 16:18:54 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00010:2186751+2186751 >> >> > 14/07/03 16:18:54 INFO Executor: Serialized size of result for 1436 >> >> > is >> >> > 5958822 >> >> > 14/07/03 16:18:54 INFO Executor: Sending result for 1436 directly to >> >> > driver >> >> > 14/07/03 16:18:54 INFO Executor: Finished task ID 1436 >> >> > 14/07/03 16:18:54 INFO CoarseGrainedExecutorBackend: Got assigned >> >> > task >> >> > 1438 >> >> > 14/07/03 16:18:54 INFO Executor: Running task ID 1438 >> >> > 14/07/03 16:19:00 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00004:0+2209615 >> >> > 14/07/03 16:19:00 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00004:2209615+2209616 >> >> > 14/07/03 16:19:00 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00011:0+2202240 >> >> > 14/07/03 16:19:00 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00011:2202240+2202240 >> >> > 14/07/03 16:19:00 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00009:0+2194423 >> >> > 14/07/03 16:19:00 INFO HadoopRDD: Input split: >> >> > file:~//2014-05-24-02/part-r-00009:2194423+2194424 >> >> > 14/07/03 16:19:00 INFO Executor: Serialized size of result for 1438 >> >> > is >> >> > 5958822 >> >> > 14/07/03 16:19:00 INFO Executor: Sending result for 1438 directly to >> >> > driver >> >> > 14/07/03 16:19:00 INFO Executor: Finished task ID 1438 >> >> > 14/07/03 16:19:14 ERROR CoarseGrainedExecutorBackend: Driver >> >> > Disassociated >> >> > [akka.tcp://sparkExecutor@slave1:51099] -> >> >> > [akka.tcp://spark@master:58272] >> >> > disassociated! Shutting down. >> >> > >> >> > >> >> > The corresponding master logs were: >> >> > >> >> > 4/07/03 16:02:14 INFO Master: Registering app LogRegExp >> >> > 14/07/03 16:02:14 INFO Master: Registered app LogRegExp with ID >> >> > app-20140703160214-0028 >> >> > 14/07/03 16:02:14 INFO Master: Launching executor >> >> > app-20140703160214-0028/1 >> >> > on worker worker-20140630124441-slave1-40182 >> >> > 14/07/03 16:19:15 INFO Master: Removing executor >> >> > app-20140703160214-0028/1 >> >> > because it is EXITED >> >> > 14/07/03 16:19:15 INFO Master: Launching executor >> >> > app-20140703160214-0028/2 >> >> > on worker worker-20140630124441-slave1-40182 >> >> > 14/07/03 16:19:15 INFO Master: Removing executor >> >> > app-20140703160214-0028/0 >> >> > because it is EXITED >> >> > 14/07/03 16:19:15 INFO Master: Launching executor >> >> > app-20140703160214-0028/3 >> >> > on worker worker-20140630102913-slave2-44735 >> >> > 14/07/03 16:19:18 INFO Master: Removing executor >> >> > app-20140703160214-0028/2 >> >> > because it is EXITED >> >> > 14/07/03 16:19:18 INFO Master: Launching executor >> >> > app-20140703160214-0028/4 >> >> > on worker worker-20140630124441-slave1-40182 >> >> > 14/07/03 16:19:18 INFO Master: Removing executor >> >> > app-20140703160214-0028/3 >> >> > because it is EXITED >> >> > 14/07/03 16:19:18 INFO Master: Launching executor >> >> > app-20140703160214-0028/5 >> >> > on worker worker-20140630102913-slave2-44735 >> >> > 14/07/03 16:19:20 INFO Master: akka.tcp://spark@master:58272 got >> >> > disassociated, removing it. >> >> > 14/07/03 16:19:20 INFO Master: Removing app app-20140703160214-0028 >> >> > 14/07/03 16:19:20 INFO Master: akka.tcp://spark@master:58272 got >> >> > disassociated, removing it. >> >> > >> >> > >> >> > Throughout the execution, I confirmed in the UI that driver memory >> >> > used >> >> > was >> >> > 0.0 B / 6.9 GB and each executor's memory showed 0.0 B / 12.1 GB even >> >> > when >> >> > aggregate was being executed. On a related note, I noticed in the >> >> > executors >> >> > tab that just before the entire job terminated, executors on slave1, >> >> > slave2 >> >> > and the driver "disappeared" momentarily from the active executors >> >> > list. >> >> > The >> >> > replacement executors on slave1 and slave2 were re-spawned a couple >> >> > of >> >> > times and appeared on the executors list again before they too died >> >> > and >> >> > the >> >> > job failed. >> >> > So it appears that no matter what the task input-result size, the >> >> > execution >> >> > fails at the end of the stage corresponding to >> >> > GradientDescent.aggregate >> >> > (and the preceding count() in GradientDescent goes through fine). Let >> >> > me >> >> > know if you need any additional information. >> >> > >> >> > >> >> > On Thu, Jul 3, 2014 at 12:27 PM, Xiangrui Meng <men...@gmail.com> >> >> > wrote: >> >> >> >> >> >> Could you check the driver memory in the executor tab of the Spark >> >> >> UI >> >> >> when the job is running? If it is too small, please set >> >> >> --driver-memory with spark-submit, e.g. 10g. Could you also attach >> >> >> the >> >> >> master log under spark/logs as well? -Xiangrui >> >> >> >> >> >> On Wed, Jul 2, 2014 at 9:34 AM, Bharath Ravi Kumar >> >> >> <reachb...@gmail.com> >> >> >> wrote: >> >> >> > Hi Xiangrui, >> >> >> > >> >> >> > The issue with aggergating/counting over large feature vectors (as >> >> >> > part >> >> >> > of >> >> >> > LogisticRegressionWithSGD) continues to exist, but now in another >> >> >> > form: >> >> >> > while the execution doesn't freeze (due to SPARK-1112), it now >> >> >> > fails >> >> >> > at >> >> >> > the >> >> >> > second or third gradient descent iteration consistently with an >> >> >> > error >> >> >> > level >> >> >> > log message, but no stacktrace. I'm running against 1.0.1-rc1, and >> >> >> > have >> >> >> > tried setting spark.akka.frameSize as high as 500. When the >> >> >> > execution >> >> >> > fails, >> >> >> > each of the two executors log the following message (corresponding >> >> >> > to >> >> >> > aggregate at GradientDescent.scala:178) : >> >> >> > >> >> >> > 14/07/02 14:09:09 INFO >> >> >> > BlockFetcherIterator$BasicBlockFetcherIterator: >> >> >> > maxBytesInFlight: 50331648, targetRequestSize: 10066329 >> >> >> > 14/07/02 14:09:09 INFO >> >> >> > BlockFetcherIterator$BasicBlockFetcherIterator: >> >> >> > Getting 2 non-empty blocks out of 2 blocks >> >> >> > 14/07/02 14:09:09 INFO >> >> >> > BlockFetcherIterator$BasicBlockFetcherIterator: >> >> >> > Started 1 remote fetches in 0 ms >> >> >> > 14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 >> >> >> > is >> >> >> > 5959086 >> >> >> > 14/07/02 14:09:11 INFO Executor: Sending result for 737 directly >> >> >> > to >> >> >> > driver >> >> >> > 14/07/02 14:09:11 INFO Executor: Finished task ID 737 >> >> >> > 14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver >> >> >> > Disassociated >> >> >> > [akka.tcp://sparkExecutor@(slave1,slave2):51941] -> >> >> >> > [akka.tcp://spark@master:59487] disassociated! Shutting down. >> >> >> > >> >> >> > >> >> >> > There is no separate stacktrace on the driver side. >> >> >> > >> >> >> > Each input record is of the form p1, p2, (p1,p2) where p1, p2 & >> >> >> > (p1,p2) >> >> >> > are >> >> >> > categorical features with large cardinality, and X is the double >> >> >> > label >> >> >> > with >> >> >> > a continuous value. The categorical variables are converted to >> >> >> > binary >> >> >> > variables which results in a feature vector of size 741092 >> >> >> > (composed >> >> >> > of >> >> >> > all >> >> >> > unique categories across p1, p2 and (p1,p2)). Thus, the labeled >> >> >> > point >> >> >> > for >> >> >> > input record is a sparse vector of size 741092 with only 3 >> >> >> > variables >> >> >> > set >> >> >> > in >> >> >> > the record. The total number of records is 683233 after >> >> >> > aggregating >> >> >> > the >> >> >> > input data on (p1, p2). When attempting to train on the >> >> >> > unaggregated >> >> >> > records >> >> >> > (1337907 in number spread across 455 files), the execution fails >> >> >> > at >> >> >> > count, >> >> >> > GradientDescent.scala:161 with the following log >> >> >> > >> >> >> > >> >> >> > (Snipped lines corresponding to other input files) >> >> >> > 14/07/02 16:02:03 INFO HadoopRDD: Input split: >> >> >> > file:~/part-r-00012:2834590+2834590 >> >> >> > 14/07/02 16:02:03 INFO HadoopRDD: Input split: >> >> >> > file:~/part-r-00005:0+2845559 >> >> >> > 14/07/02 16:02:03 INFO HadoopRDD: Input split: >> >> >> > file:~/part-r-00005:2845559+2845560 >> >> >> > 14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 >> >> >> > is >> >> >> > 615 >> >> >> > 14/07/02 16:02:03 INFO Executor: Sending result for 726 directly >> >> >> > to >> >> >> > driver >> >> >> > 14/07/02 16:02:03 INFO Executor: Finished task ID 726 >> >> >> > 14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver >> >> >> > Disassociated >> >> >> > [akka.tcp://sparkExecutor@slave1:48423] -> >> >> >> > [akka.tcp://spark@master:55792] >> >> >> > disassociated! Shutting down. >> >> >> > >> >> >> > A count() attempted on the input RDD before beginning training has >> >> >> > the >> >> >> > following metrics: >> >> >> > >> >> >> > >> >> >> > Metric Min 25th Median 75th Max >> >> >> > >> >> >> > Result >> >> >> > serialization >> >> >> > time 0 ms 0 ms 0 ms 0 ms 0 ms >> >> >> > >> >> >> > Duration 33 s 33 s 35 s 35 s 35 s >> >> >> > >> >> >> > Time spent >> >> >> > fetching task >> >> >> > results 0 ms 0 ms 0 ms 0 ms 0 ms >> >> >> > >> >> >> > Scheduler >> >> >> > delay 0.1 s 0.1 s 0.3 s 0.3 s 0.3 s >> >> >> > >> >> >> > Aggregated Metrics by Executor >> >> >> > >> >> >> > ID Address Task Time Total Failed Succeeded >> >> >> > Shuffle >> >> >> > Read >> >> >> > Shuffle Write Shuf Spill (Mem) Shuf Spill (Disk) >> >> >> > 0 CANNOT FIND ADDRESS 34 s 1 0 1 >> >> >> > 0.0 >> >> >> > B >> >> >> > 0.0 B 0.0 B 0.0 B >> >> >> > 1 CANNOT FIND ADDRESS 36 s 1 0 1 >> >> >> > 0.0 >> >> >> > B >> >> >> > 0.0 B 0.0 B 0.0 B >> >> >> > >> >> >> > Tasks >> >> >> > >> >> >> > Task Index Task ID Status Locality Level Executor >> >> >> > Launch >> >> >> > Time >> >> >> > Duration GC Time Result Ser Time Errors >> >> >> > 0 726 SUCCESS PROCESS_LOCAL slave1 >> >> >> > 2014/07/02 >> >> >> > 16:01:28 35 s 0.1 s >> >> >> > 1 727 SUCCESS PROCESS_LOCAL slave2 >> >> >> > 2014/07/02 >> >> >> > 16:01:28 33 s 99 ms >> >> >> > >> >> >> > Any pointers / diagnosis please? >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > On Thu, Jun 19, 2014 at 10:03 AM, Bharath Ravi Kumar >> >> >> > <reachb...@gmail.com> >> >> >> > wrote: >> >> >> >> >> >> >> >> Thanks. I'll await the fix to re-run my test. >> >> >> >> >> >> >> >> >> >> >> >> On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng <men...@gmail.com> >> >> >> >> wrote: >> >> >> >>> >> >> >> >>> Hi Bharath, >> >> >> >>> >> >> >> >>> This is related to SPARK-1112, which we already found the root >> >> >> >>> cause. >> >> >> >>> I will let you know when this is fixed. >> >> >> >>> >> >> >> >>> Best, >> >> >> >>> Xiangrui >> >> >> >>> >> >> >> >>> On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar >> >> >> >>> <reachb...@gmail.com> >> >> >> >>> wrote: >> >> >> >>> > Couple more points: >> >> >> >>> > 1)The inexplicable stalling of execution with large feature >> >> >> >>> > sets >> >> >> >>> > appears >> >> >> >>> > similar to that reported with the news-20 dataset: >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E >> >> >> >>> > >> >> >> >>> > 2) The NPE trying to call mapToPair convert an RDD<Long, Long, >> >> >> >>> > Integer, >> >> >> >>> > Integer> into a JavaPairRDD<Tuple2<Long,Long>, >> >> >> >>> > Tuple2<Integer,Integer>> >> >> >> >>> > is >> >> >> >>> > unrelated to mllib. >> >> >> >>> > >> >> >> >>> > Thanks, >> >> >> >>> > Bharath >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar >> >> >> >>> > <reachb...@gmail.com> >> >> >> >>> > wrote: >> >> >> >>> >> >> >> >> >>> >> Hi Xiangrui , >> >> >> >>> >> >> >> >> >>> >> I'm using 1.0.0. >> >> >> >>> >> >> >> >> >>> >> Thanks, >> >> >> >>> >> Bharath >> >> >> >>> >> >> >> >> >>> >> On 18-Jun-2014 1:43 am, "Xiangrui Meng" <men...@gmail.com> >> >> >> >>> >> wrote: >> >> >> >>> >>> >> >> >> >>> >>> Hi Bharath, >> >> >> >>> >>> >> >> >> >>> >>> Thanks for posting the details! Which Spark version are you >> >> >> >>> >>> using? >> >> >> >>> >>> >> >> >> >>> >>> Best, >> >> >> >>> >>> Xiangrui >> >> >> >>> >>> >> >> >> >>> >>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar >> >> >> >>> >>> <reachb...@gmail.com> >> >> >> >>> >>> wrote: >> >> >> >>> >>> > Hi, >> >> >> >>> >>> > >> >> >> >>> >>> > (Apologies for the long mail, but it's necessary to >> >> >> >>> >>> > provide >> >> >> >>> >>> > sufficient >> >> >> >>> >>> > details considering the number of issues faced.) >> >> >> >>> >>> > >> >> >> >>> >>> > I'm running into issues testing LogisticRegressionWithSGD >> >> >> >>> >>> > a >> >> >> >>> >>> > two >> >> >> >>> >>> > node >> >> >> >>> >>> > cluster >> >> >> >>> >>> > (each node with 24 cores and 16G available to slaves out >> >> >> >>> >>> > of >> >> >> >>> >>> > 24G >> >> >> >>> >>> > on >> >> >> >>> >>> > the >> >> >> >>> >>> > system). Here's a description of the application: >> >> >> >>> >>> > >> >> >> >>> >>> > The model is being trained based on categorical features >> >> >> >>> >>> > x, >> >> >> >>> >>> > y, >> >> >> >>> >>> > and >> >> >> >>> >>> > (x,y). >> >> >> >>> >>> > The categorical features are mapped to binary features by >> >> >> >>> >>> > converting >> >> >> >>> >>> > each >> >> >> >>> >>> > distinct value in the category enum into a binary feature >> >> >> >>> >>> > by >> >> >> >>> >>> > itself >> >> >> >>> >>> > (i.e >> >> >> >>> >>> > presence of that value in a record implies corresponding >> >> >> >>> >>> > feature >> >> >> >>> >>> > = >> >> >> >>> >>> > 1, >> >> >> >>> >>> > else >> >> >> >>> >>> > feature = 0. So, there'd be as many distinct features as >> >> >> >>> >>> > enum >> >> >> >>> >>> > values) . >> >> >> >>> >>> > The >> >> >> >>> >>> > training vector is laid out as >> >> >> >>> >>> > [x1,x2...xn,y1,y2....yn,(x1,y1),(x2,y2)...(xn,yn)]. Each >> >> >> >>> >>> > record >> >> >> >>> >>> > in >> >> >> >>> >>> > the >> >> >> >>> >>> > training data has only one combination (Xk,Yk) and a label >> >> >> >>> >>> > appearing in >> >> >> >>> >>> > the >> >> >> >>> >>> > record. Thus, the corresponding labeledpoint sparse vector >> >> >> >>> >>> > would >> >> >> >>> >>> > only >> >> >> >>> >>> > have 3 >> >> >> >>> >>> > values Xk, Yk, (Xk,Yk) set for a record. The total length >> >> >> >>> >>> > of >> >> >> >>> >>> > the >> >> >> >>> >>> > vector >> >> >> >>> >>> > (though parse) would be nearly 614000. The number of >> >> >> >>> >>> > records >> >> >> >>> >>> > is >> >> >> >>> >>> > about >> >> >> >>> >>> > 1.33 >> >> >> >>> >>> > million. The records have been coalesced into 20 >> >> >> >>> >>> > partitions >> >> >> >>> >>> > across >> >> >> >>> >>> > two >> >> >> >>> >>> > nodes. The input data has not been cached. >> >> >> >>> >>> > (NOTE: I do realize the records & features may seem large >> >> >> >>> >>> > for >> >> >> >>> >>> > a >> >> >> >>> >>> > two >> >> >> >>> >>> > node >> >> >> >>> >>> > setup, but given the memory & cpu, and the fact that I'm >> >> >> >>> >>> > willing >> >> >> >>> >>> > to >> >> >> >>> >>> > give up >> >> >> >>> >>> > some turnaround time, I don't see why tasks should >> >> >> >>> >>> > inexplicably >> >> >> >>> >>> > fail) >> >> >> >>> >>> > >> >> >> >>> >>> > Additional parameters include: >> >> >> >>> >>> > >> >> >> >>> >>> > spark.executor.memory = 14G >> >> >> >>> >>> > spark.default.parallelism = 1 >> >> >> >>> >>> > spark.cores.max=20 >> >> >> >>> >>> > spark.storage.memoryFraction=0.8 //No cache space required >> >> >> >>> >>> > (Trying to set spark.akka.frameSize to a larger number, >> >> >> >>> >>> > say, >> >> >> >>> >>> > 20 >> >> >> >>> >>> > didn't >> >> >> >>> >>> > help >> >> >> >>> >>> > either) >> >> >> >>> >>> > >> >> >> >>> >>> > The model training was initialized as : new >> >> >> >>> >>> > LogisticRegressionWithSGD(1, >> >> >> >>> >>> > maxIterations, 0.0, 0.05) >> >> >> >>> >>> > >> >> >> >>> >>> > However, after 4 iterations of gradient descent, the >> >> >> >>> >>> > entire >> >> >> >>> >>> > execution >> >> >> >>> >>> > appeared to stall inexplicably. The corresponding executor >> >> >> >>> >>> > details >> >> >> >>> >>> > and >> >> >> >>> >>> > details of the stalled stage (number 14) are as follows: >> >> >> >>> >>> > >> >> >> >>> >>> > Metric Min 25th Median >> >> >> >>> >>> > 75th >> >> >> >>> >>> > Max >> >> >> >>> >>> > Result serialization time 12 ms 13 ms 14 ms 16 >> >> >> >>> >>> > ms >> >> >> >>> >>> > 18 >> >> >> >>> >>> > ms >> >> >> >>> >>> > Duration 4 s 4 s 5 s >> >> >> >>> >>> > 5 s >> >> >> >>> >>> > 5 s >> >> >> >>> >>> > Time spent fetching task 0 ms 0 ms 0 ms 0 ms >> >> >> >>> >>> > 0 >> >> >> >>> >>> > ms >> >> >> >>> >>> > results >> >> >> >>> >>> > Scheduler delay 6 s 6 s 6 s >> >> >> >>> >>> > 6 s >> >> >> >>> >>> > 12 s >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > Stage Id >> >> >> >>> >>> > 14 aggregate at GradientDescent.scala:178 >> >> >> >>> >>> > >> >> >> >>> >>> > Task Index Task ID Status Locality Level >> >> >> >>> >>> > Executor >> >> >> >>> >>> > Launch Time Duration GC Result Ser >> >> >> >>> >>> > Time >> >> >> >>> >>> > Errors >> >> >> >>> >>> > >> >> >> >>> >>> > Time >> >> >> >>> >>> > >> >> >> >>> >>> > 0 600 RUNNING PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >> >> >>> >>> > 1 601 RUNNING PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >> >> >>> >>> > 2 602 RUNNING PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >> >> >>> >>> > 3 603 RUNNING PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >> >> >>> >>> > 4 604 RUNNING PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >> >> >>> >>> > 5 605 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 4 s 2 s 12 ms >> >> >> >>> >>> > 6 606 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 14 ms >> >> >> >>> >>> > 7 607 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 4 s 2 s 12 ms >> >> >> >>> >>> > 8 608 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >> >> >>> >>> > 9 609 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 14 ms >> >> >> >>> >>> > 10 610 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >> >> >>> >>> > 11 611 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 13 ms >> >> >> >>> >>> > 12 612 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 18 ms >> >> >> >>> >>> > 13 613 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 13 ms >> >> >> >>> >>> > 14 614 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 14 ms >> >> >> >>> >>> > 15 615 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 12 ms >> >> >> >>> >>> > 16 616 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >> >> >>> >>> > 17 617 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 18 ms >> >> >> >>> >>> > 18 618 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > serious.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 16 ms >> >> >> >>> >>> > 19 619 SUCCESS PROCESS_LOCAL >> >> >> >>> >>> > casual.dataone.foo.bar.com >> >> >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 18 ms >> >> >> >>> >>> > >> >> >> >>> >>> > Executor stats: >> >> >> >>> >>> > >> >> >> >>> >>> > RDD Blocks Memory Used Disk Used Active Tasks >> >> >> >>> >>> > Failed >> >> >> >>> >>> > Tasks >> >> >> >>> >>> > Complete Tasks Total Tasks Task Time Shuffle Read >> >> >> >>> >>> > Shuffle >> >> >> >>> >>> > Write >> >> >> >>> >>> > 0 0.0 B / 6.7 GB 0.0 B 2 >> >> >> >>> >>> > 0 >> >> >> >>> >>> > 307 309 23.2 m 0.0 B >> >> >> >>> >>> > 0.0 >> >> >> >>> >>> > B >> >> >> >>> >>> > 0 0.0 B / 6.7 GB 0.0 B 3 >> >> >> >>> >>> > 0 >> >> >> >>> >>> > 308 311 22.4 m 0.0 B >> >> >> >>> >>> > 0.0 >> >> >> >>> >>> > B >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > Executor jmap output: >> >> >> >>> >>> > >> >> >> >>> >>> > Server compiler detected. >> >> >> >>> >>> > JVM version is 24.55-b03 >> >> >> >>> >>> > >> >> >> >>> >>> > using thread-local object allocation. >> >> >> >>> >>> > Parallel GC with 18 thread(s) >> >> >> >>> >>> > >> >> >> >>> >>> > Heap Configuration: >> >> >> >>> >>> > MinHeapFreeRatio = 40 >> >> >> >>> >>> > MaxHeapFreeRatio = 70 >> >> >> >>> >>> > MaxHeapSize = 10737418240 (10240.0MB) >> >> >> >>> >>> > NewSize = 1310720 (1.25MB) >> >> >> >>> >>> > MaxNewSize = 17592186044415 MB >> >> >> >>> >>> > OldSize = 5439488 (5.1875MB) >> >> >> >>> >>> > NewRatio = 2 >> >> >> >>> >>> > SurvivorRatio = 8 >> >> >> >>> >>> > PermSize = 21757952 (20.75MB) >> >> >> >>> >>> > MaxPermSize = 134217728 (128.0MB) >> >> >> >>> >>> > G1HeapRegionSize = 0 (0.0MB) >> >> >> >>> >>> > >> >> >> >>> >>> > Heap Usage: >> >> >> >>> >>> > PS Young Generation >> >> >> >>> >>> > Eden Space: >> >> >> >>> >>> > capacity = 2783969280 (2655.0MB) >> >> >> >>> >>> > used = 192583816 (183.66223907470703MB) >> >> >> >>> >>> > free = 2591385464 (2471.337760925293MB) >> >> >> >>> >>> > 6.917598458557704% used >> >> >> >>> >>> > From Space: >> >> >> >>> >>> > capacity = 409993216 (391.0MB) >> >> >> >>> >>> > used = 1179808 (1.125152587890625MB) >> >> >> >>> >>> > free = 408813408 (389.8748474121094MB) >> >> >> >>> >>> > 0.2877628102022059% used >> >> >> >>> >>> > To Space: >> >> >> >>> >>> > capacity = 385351680 (367.5MB) >> >> >> >>> >>> > used = 0 (0.0MB) >> >> >> >>> >>> > free = 385351680 (367.5MB) >> >> >> >>> >>> > 0.0% used >> >> >> >>> >>> > PS Old Generation >> >> >> >>> >>> > capacity = 7158628352 (6827.0MB) >> >> >> >>> >>> > used = 4455093024 (4248.707794189453MB) >> >> >> >>> >>> > free = 2703535328 (2578.292205810547MB) >> >> >> >>> >>> > 62.2338918146983% used >> >> >> >>> >>> > PS Perm Generation >> >> >> >>> >>> > capacity = 90701824 (86.5MB) >> >> >> >>> >>> > used = 45348832 (43.248016357421875MB) >> >> >> >>> >>> > free = 45352992 (43.251983642578125MB) >> >> >> >>> >>> > 49.99770677158598% used >> >> >> >>> >>> > >> >> >> >>> >>> > 8432 interned Strings occupying 714672 bytes. >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > Executor GC log snippet: >> >> >> >>> >>> > >> >> >> >>> >>> > 168.778: [GC [PSYoungGen: 2702831K->578545K(2916864K)] >> >> >> >>> >>> > 9302453K->7460857K(9907712K), 0.3193550 secs] [Times: >> >> >> >>> >>> > user=5.13 >> >> >> >>> >>> > sys=0.39, >> >> >> >>> >>> > real=0.32 secs] >> >> >> >>> >>> > 169.097: [Full GC [PSYoungGen: 578545K->0K(2916864K)] >> >> >> >>> >>> > [ParOldGen: >> >> >> >>> >>> > 6882312K->1073297K(6990848K)] 7460857K->1073297K(9907712K) >> >> >> >>> >>> > [PSPermGen: >> >> >> >>> >>> > 44248K->44201K(88576K)], 4.5521090 secs] [Times: >> >> >> >>> >>> > user=24.22 >> >> >> >>> >>> > sys=0.18, >> >> >> >>> >>> > real=4.55 secs] >> >> >> >>> >>> > 174.207: [GC [PSYoungGen: 2338304K->81315K(2544128K)] >> >> >> >>> >>> > 3411653K->1154665K(9534976K), 0.0966280 secs] [Times: >> >> >> >>> >>> > user=1.66 >> >> >> >>> >>> > sys=0.00, >> >> >> >>> >>> > real=0.09 secs] >> >> >> >>> >>> > >> >> >> >>> >>> > I tried to map partitions to cores on the nodes. >> >> >> >>> >>> > Increasing >> >> >> >>> >>> > the >> >> >> >>> >>> > number >> >> >> >>> >>> > of >> >> >> >>> >>> > partitions (say to 80 or 100) would result in progress >> >> >> >>> >>> > till >> >> >> >>> >>> > the >> >> >> >>> >>> > 6th >> >> >> >>> >>> > iteration or so, but the next stage would stall as before >> >> >> >>> >>> > with >> >> >> >>> >>> > apparent >> >> >> >>> >>> > root >> >> >> >>> >>> > cause / logs. With increased partitions, the last stage >> >> >> >>> >>> > that >> >> >> >>> >>> > completed >> >> >> >>> >>> > had >> >> >> >>> >>> > the following task times: >> >> >> >>> >>> > >> >> >> >>> >>> > Metric Min 25th Median >> >> >> >>> >>> > 75th >> >> >> >>> >>> > Max >> >> >> >>> >>> > Result serialization time 11 ms 12 ms 13 ms 15 >> >> >> >>> >>> > ms >> >> >> >>> >>> > 0.4 s >> >> >> >>> >>> > Duration 0.5 s 0.9 s 1 s 3 >> >> >> >>> >>> > s >> >> >> >>> >>> > 7 s >> >> >> >>> >>> > Time spent fetching 0 ms 0 ms 0 ms 0 >> >> >> >>> >>> > ms >> >> >> >>> >>> > 0 >> >> >> >>> >>> > ms >> >> >> >>> >>> > task results >> >> >> >>> >>> > Scheduler delay 5 s 6 s 6 s >> >> >> >>> >>> > 7 s >> >> >> >>> >>> > 12 s >> >> >> >>> >>> > >> >> >> >>> >>> > My hypothesis is that as the coefficient array becomes >> >> >> >>> >>> > less >> >> >> >>> >>> > sparse >> >> >> >>> >>> > (with >> >> >> >>> >>> > successive iterations), the cost of the aggregate goes up >> >> >> >>> >>> > to >> >> >> >>> >>> > the >> >> >> >>> >>> > point >> >> >> >>> >>> > that >> >> >> >>> >>> > it stalls (which I failed to explain). Reducing the batch >> >> >> >>> >>> > fraction >> >> >> >>> >>> > to a >> >> >> >>> >>> > very >> >> >> >>> >>> > low number like 0.01 saw the iterations progress further, >> >> >> >>> >>> > but >> >> >> >>> >>> > the >> >> >> >>> >>> > model >> >> >> >>> >>> > failed to converge in that case after a small number of >> >> >> >>> >>> > iterations. >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > I also tried reducing the number of records by aggregating >> >> >> >>> >>> > on >> >> >> >>> >>> > (x,y) >> >> >> >>> >>> > as >> >> >> >>> >>> > the >> >> >> >>> >>> > key (i.e. using aggregations instead of training on every >> >> >> >>> >>> > raw >> >> >> >>> >>> > record), >> >> >> >>> >>> > but >> >> >> >>> >>> > encountered by the following exception: >> >> >> >>> >>> > >> >> >> >>> >>> > Loss was due to java.lang.NullPointerException >> >> >> >>> >>> > java.lang.NullPointerException >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) >> >> >> >>> >>> > at >> >> >> >>> >>> > org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) >> >> >> >>> >>> > at >> >> >> >>> >>> > org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> >> >> >>> >>> > at >> >> >> >>> >>> > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >> >> >> >>> >>> > at >> >> >> >>> >>> > org.apache.spark.scheduler.Task.run(Task.scala:51) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> >> >>> >>> > at >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> >> >>> >>> > at java.lang.Thread.run(Thread.java:745) >> >> >> >>> >>> > >> >> >> >>> >>> > >> >> >> >>> >>> > I'd appreciate any insights/comments about what may be >> >> >> >>> >>> > causing >> >> >> >>> >>> > the >> >> >> >>> >>> > execution >> >> >> >>> >>> > to stall. >> >> >> >>> >>> > >> >> >> >>> >>> > If logs/tables appear poorly indented in the email, here's >> >> >> >>> >>> > a >> >> >> >>> >>> > gist >> >> >> >>> >>> > with >> >> >> >>> >>> > relevant details: >> >> >> >>> >>> > https://gist.github.com/reachbach/a418ab2f01b639b624c1 >> >> >> >>> >>> > >> >> >> >>> >>> > Thanks, >> >> >> >>> >>> > Bharath >> >> >> >>> > >> >> >> >>> > >> >> >> >> >> >> >> >> >> >> >> > >> >> > >> >> > >> > >> > > >