Could you check the driver memory in the executor tab of the Spark UI when the job is running? If it is too small, please set --driver-memory with spark-submit, e.g. 10g. Could you also attach the master log under spark/logs as well? -Xiangrui
On Wed, Jul 2, 2014 at 9:34 AM, Bharath Ravi Kumar <reachb...@gmail.com> wrote: > Hi Xiangrui, > > The issue with aggergating/counting over large feature vectors (as part of > LogisticRegressionWithSGD) continues to exist, but now in another form: > while the execution doesn't freeze (due to SPARK-1112), it now fails at the > second or third gradient descent iteration consistently with an error level > log message, but no stacktrace. I'm running against 1.0.1-rc1, and have > tried setting spark.akka.frameSize as high as 500. When the execution fails, > each of the two executors log the following message (corresponding to > aggregate at GradientDescent.scala:178) : > > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: > maxBytesInFlight: 50331648, targetRequestSize: 10066329 > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: > Getting 2 non-empty blocks out of 2 blocks > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: > Started 1 remote fetches in 0 ms > 14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 is > 5959086 > 14/07/02 14:09:11 INFO Executor: Sending result for 737 directly to driver > 14/07/02 14:09:11 INFO Executor: Finished task ID 737 > 14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver Disassociated > [akka.tcp://sparkExecutor@(slave1,slave2):51941] -> > [akka.tcp://spark@master:59487] disassociated! Shutting down. > > > There is no separate stacktrace on the driver side. > > Each input record is of the form p1, p2, (p1,p2) where p1, p2 & (p1,p2) are > categorical features with large cardinality, and X is the double label with > a continuous value. The categorical variables are converted to binary > variables which results in a feature vector of size 741092 (composed of all > unique categories across p1, p2 and (p1,p2)). Thus, the labeled point for > input record is a sparse vector of size 741092 with only 3 variables set in > the record. The total number of records is 683233 after aggregating the > input data on (p1, p2). When attempting to train on the unaggregated records > (1337907 in number spread across 455 files), the execution fails at count, > GradientDescent.scala:161 with the following log > > > (Snipped lines corresponding to other input files) > 14/07/02 16:02:03 INFO HadoopRDD: Input split: > file:~/part-r-00012:2834590+2834590 > 14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-00005:0+2845559 > 14/07/02 16:02:03 INFO HadoopRDD: Input split: > file:~/part-r-00005:2845559+2845560 > 14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 is 615 > 14/07/02 16:02:03 INFO Executor: Sending result for 726 directly to driver > 14/07/02 16:02:03 INFO Executor: Finished task ID 726 > 14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver Disassociated > [akka.tcp://sparkExecutor@slave1:48423] -> [akka.tcp://spark@master:55792] > disassociated! Shutting down. > > A count() attempted on the input RDD before beginning training has the > following metrics: > > > Metric Min 25th Median 75th Max > > Result > serialization > time 0 ms 0 ms 0 ms 0 ms 0 ms > > Duration 33 s 33 s 35 s 35 s 35 s > > Time spent > fetching task > results 0 ms 0 ms 0 ms 0 ms 0 ms > > Scheduler > delay 0.1 s 0.1 s 0.3 s 0.3 s 0.3 s > > Aggregated Metrics by Executor > > ID Address Task Time Total Failed Succeeded Shuffle Read > Shuffle Write Shuf Spill (Mem) Shuf Spill (Disk) > 0 CANNOT FIND ADDRESS 34 s 1 0 1 0.0 B > 0.0 B 0.0 B 0.0 B > 1 CANNOT FIND ADDRESS 36 s 1 0 1 0.0 B > 0.0 B 0.0 B 0.0 B > > Tasks > > Task Index Task ID Status Locality Level Executor Launch Time > Duration GC Time Result Ser Time Errors > 0 726 SUCCESS PROCESS_LOCAL slave1 2014/07/02 > 16:01:28 35 s 0.1 s > 1 727 SUCCESS PROCESS_LOCAL slave2 2014/07/02 > 16:01:28 33 s 99 ms > > Any pointers / diagnosis please? > > > > > On Thu, Jun 19, 2014 at 10:03 AM, Bharath Ravi Kumar <reachb...@gmail.com> > wrote: >> >> Thanks. I'll await the fix to re-run my test. >> >> >> On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng <men...@gmail.com> wrote: >>> >>> Hi Bharath, >>> >>> This is related to SPARK-1112, which we already found the root cause. >>> I will let you know when this is fixed. >>> >>> Best, >>> Xiangrui >>> >>> On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar <reachb...@gmail.com> >>> wrote: >>> > Couple more points: >>> > 1)The inexplicable stalling of execution with large feature sets >>> > appears >>> > similar to that reported with the news-20 dataset: >>> > >>> > http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E >>> > >>> > 2) The NPE trying to call mapToPair convert an RDD<Long, Long, Integer, >>> > Integer> into a JavaPairRDD<Tuple2<Long,Long>, Tuple2<Integer,Integer>> >>> > is >>> > unrelated to mllib. >>> > >>> > Thanks, >>> > Bharath >>> > >>> > >>> > >>> > On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar >>> > <reachb...@gmail.com> >>> > wrote: >>> >> >>> >> Hi Xiangrui , >>> >> >>> >> I'm using 1.0.0. >>> >> >>> >> Thanks, >>> >> Bharath >>> >> >>> >> On 18-Jun-2014 1:43 am, "Xiangrui Meng" <men...@gmail.com> wrote: >>> >>> >>> >>> Hi Bharath, >>> >>> >>> >>> Thanks for posting the details! Which Spark version are you using? >>> >>> >>> >>> Best, >>> >>> Xiangrui >>> >>> >>> >>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar >>> >>> <reachb...@gmail.com> >>> >>> wrote: >>> >>> > Hi, >>> >>> > >>> >>> > (Apologies for the long mail, but it's necessary to provide >>> >>> > sufficient >>> >>> > details considering the number of issues faced.) >>> >>> > >>> >>> > I'm running into issues testing LogisticRegressionWithSGD a two >>> >>> > node >>> >>> > cluster >>> >>> > (each node with 24 cores and 16G available to slaves out of 24G on >>> >>> > the >>> >>> > system). Here's a description of the application: >>> >>> > >>> >>> > The model is being trained based on categorical features x, y, and >>> >>> > (x,y). >>> >>> > The categorical features are mapped to binary features by >>> >>> > converting >>> >>> > each >>> >>> > distinct value in the category enum into a binary feature by itself >>> >>> > (i.e >>> >>> > presence of that value in a record implies corresponding feature = >>> >>> > 1, >>> >>> > else >>> >>> > feature = 0. So, there'd be as many distinct features as enum >>> >>> > values) . >>> >>> > The >>> >>> > training vector is laid out as >>> >>> > [x1,x2...xn,y1,y2....yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in >>> >>> > the >>> >>> > training data has only one combination (Xk,Yk) and a label >>> >>> > appearing in >>> >>> > the >>> >>> > record. Thus, the corresponding labeledpoint sparse vector would >>> >>> > only >>> >>> > have 3 >>> >>> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the >>> >>> > vector >>> >>> > (though parse) would be nearly 614000. The number of records is >>> >>> > about >>> >>> > 1.33 >>> >>> > million. The records have been coalesced into 20 partitions across >>> >>> > two >>> >>> > nodes. The input data has not been cached. >>> >>> > (NOTE: I do realize the records & features may seem large for a two >>> >>> > node >>> >>> > setup, but given the memory & cpu, and the fact that I'm willing to >>> >>> > give up >>> >>> > some turnaround time, I don't see why tasks should inexplicably >>> >>> > fail) >>> >>> > >>> >>> > Additional parameters include: >>> >>> > >>> >>> > spark.executor.memory = 14G >>> >>> > spark.default.parallelism = 1 >>> >>> > spark.cores.max=20 >>> >>> > spark.storage.memoryFraction=0.8 //No cache space required >>> >>> > (Trying to set spark.akka.frameSize to a larger number, say, 20 >>> >>> > didn't >>> >>> > help >>> >>> > either) >>> >>> > >>> >>> > The model training was initialized as : new >>> >>> > LogisticRegressionWithSGD(1, >>> >>> > maxIterations, 0.0, 0.05) >>> >>> > >>> >>> > However, after 4 iterations of gradient descent, the entire >>> >>> > execution >>> >>> > appeared to stall inexplicably. The corresponding executor details >>> >>> > and >>> >>> > details of the stalled stage (number 14) are as follows: >>> >>> > >>> >>> > Metric Min 25th Median 75th >>> >>> > Max >>> >>> > Result serialization time 12 ms 13 ms 14 ms 16 ms 18 >>> >>> > ms >>> >>> > Duration 4 s 4 s 5 s 5 s >>> >>> > 5 s >>> >>> > Time spent fetching task 0 ms 0 ms 0 ms 0 ms 0 ms >>> >>> > results >>> >>> > Scheduler delay 6 s 6 s 6 s 6 s >>> >>> > 12 s >>> >>> > >>> >>> > >>> >>> > Stage Id >>> >>> > 14 aggregate at GradientDescent.scala:178 >>> >>> > >>> >>> > Task Index Task ID Status Locality Level Executor >>> >>> > Launch Time Duration GC Result Ser Time >>> >>> > Errors >>> >>> > >>> >>> > Time >>> >>> > >>> >>> > 0 600 RUNNING PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 1.1 h >>> >>> > 1 601 RUNNING PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 1.1 h >>> >>> > 2 602 RUNNING PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 1.1 h >>> >>> > 3 603 RUNNING PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 1.1 h >>> >>> > 4 604 RUNNING PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 1.1 h >>> >>> > 5 605 SUCCESS PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 4 s 2 s 12 ms >>> >>> > 6 606 SUCCESS PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 4 s 1 s 14 ms >>> >>> > 7 607 SUCCESS PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 4 s 2 s 12 ms >>> >>> > 8 608 SUCCESS PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >>> >>> > 9 609 SUCCESS PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 5 s 1 s 14 ms >>> >>> > 10 610 SUCCESS PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >>> >>> > 11 611 SUCCESS PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 4 s 1 s 13 ms >>> >>> > 12 612 SUCCESS PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 5 s 1 s 18 ms >>> >>> > 13 613 SUCCESS PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 5 s 1 s 13 ms >>> >>> > 14 614 SUCCESS PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 4 s 1 s 14 ms >>> >>> > 15 615 SUCCESS PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 4 s 1 s 12 ms >>> >>> > 16 616 SUCCESS PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >>> >>> > 17 617 SUCCESS PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 5 s 1 s 18 ms >>> >>> > 18 618 SUCCESS PROCESS_LOCAL >>> >>> > serious.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 5 s 1 s 16 ms >>> >>> > 19 619 SUCCESS PROCESS_LOCAL >>> >>> > casual.dataone.foo.bar.com >>> >>> > 2014/06/17 10:32:27 4 s 1 s 18 ms >>> >>> > >>> >>> > Executor stats: >>> >>> > >>> >>> > RDD Blocks Memory Used Disk Used Active Tasks Failed >>> >>> > Tasks >>> >>> > Complete Tasks Total Tasks Task Time Shuffle Read >>> >>> > Shuffle >>> >>> > Write >>> >>> > 0 0.0 B / 6.7 GB 0.0 B 2 0 >>> >>> > 307 309 23.2 m 0.0 B 0.0 B >>> >>> > 0 0.0 B / 6.7 GB 0.0 B 3 0 >>> >>> > 308 311 22.4 m 0.0 B 0.0 B >>> >>> > >>> >>> > >>> >>> > Executor jmap output: >>> >>> > >>> >>> > Server compiler detected. >>> >>> > JVM version is 24.55-b03 >>> >>> > >>> >>> > using thread-local object allocation. >>> >>> > Parallel GC with 18 thread(s) >>> >>> > >>> >>> > Heap Configuration: >>> >>> > MinHeapFreeRatio = 40 >>> >>> > MaxHeapFreeRatio = 70 >>> >>> > MaxHeapSize = 10737418240 (10240.0MB) >>> >>> > NewSize = 1310720 (1.25MB) >>> >>> > MaxNewSize = 17592186044415 MB >>> >>> > OldSize = 5439488 (5.1875MB) >>> >>> > NewRatio = 2 >>> >>> > SurvivorRatio = 8 >>> >>> > PermSize = 21757952 (20.75MB) >>> >>> > MaxPermSize = 134217728 (128.0MB) >>> >>> > G1HeapRegionSize = 0 (0.0MB) >>> >>> > >>> >>> > Heap Usage: >>> >>> > PS Young Generation >>> >>> > Eden Space: >>> >>> > capacity = 2783969280 (2655.0MB) >>> >>> > used = 192583816 (183.66223907470703MB) >>> >>> > free = 2591385464 (2471.337760925293MB) >>> >>> > 6.917598458557704% used >>> >>> > From Space: >>> >>> > capacity = 409993216 (391.0MB) >>> >>> > used = 1179808 (1.125152587890625MB) >>> >>> > free = 408813408 (389.8748474121094MB) >>> >>> > 0.2877628102022059% used >>> >>> > To Space: >>> >>> > capacity = 385351680 (367.5MB) >>> >>> > used = 0 (0.0MB) >>> >>> > free = 385351680 (367.5MB) >>> >>> > 0.0% used >>> >>> > PS Old Generation >>> >>> > capacity = 7158628352 (6827.0MB) >>> >>> > used = 4455093024 (4248.707794189453MB) >>> >>> > free = 2703535328 (2578.292205810547MB) >>> >>> > 62.2338918146983% used >>> >>> > PS Perm Generation >>> >>> > capacity = 90701824 (86.5MB) >>> >>> > used = 45348832 (43.248016357421875MB) >>> >>> > free = 45352992 (43.251983642578125MB) >>> >>> > 49.99770677158598% used >>> >>> > >>> >>> > 8432 interned Strings occupying 714672 bytes. >>> >>> > >>> >>> > >>> >>> > Executor GC log snippet: >>> >>> > >>> >>> > 168.778: [GC [PSYoungGen: 2702831K->578545K(2916864K)] >>> >>> > 9302453K->7460857K(9907712K), 0.3193550 secs] [Times: user=5.13 >>> >>> > sys=0.39, >>> >>> > real=0.32 secs] >>> >>> > 169.097: [Full GC [PSYoungGen: 578545K->0K(2916864K)] [ParOldGen: >>> >>> > 6882312K->1073297K(6990848K)] 7460857K->1073297K(9907712K) >>> >>> > [PSPermGen: >>> >>> > 44248K->44201K(88576K)], 4.5521090 secs] [Times: user=24.22 >>> >>> > sys=0.18, >>> >>> > real=4.55 secs] >>> >>> > 174.207: [GC [PSYoungGen: 2338304K->81315K(2544128K)] >>> >>> > 3411653K->1154665K(9534976K), 0.0966280 secs] [Times: user=1.66 >>> >>> > sys=0.00, >>> >>> > real=0.09 secs] >>> >>> > >>> >>> > I tried to map partitions to cores on the nodes. Increasing the >>> >>> > number >>> >>> > of >>> >>> > partitions (say to 80 or 100) would result in progress till the 6th >>> >>> > iteration or so, but the next stage would stall as before with >>> >>> > apparent >>> >>> > root >>> >>> > cause / logs. With increased partitions, the last stage that >>> >>> > completed >>> >>> > had >>> >>> > the following task times: >>> >>> > >>> >>> > Metric Min 25th Median 75th >>> >>> > Max >>> >>> > Result serialization time 11 ms 12 ms 13 ms 15 ms >>> >>> > 0.4 s >>> >>> > Duration 0.5 s 0.9 s 1 s 3 s >>> >>> > 7 s >>> >>> > Time spent fetching 0 ms 0 ms 0 ms 0 ms 0 ms >>> >>> > task results >>> >>> > Scheduler delay 5 s 6 s 6 s 7 s >>> >>> > 12 s >>> >>> > >>> >>> > My hypothesis is that as the coefficient array becomes less sparse >>> >>> > (with >>> >>> > successive iterations), the cost of the aggregate goes up to the >>> >>> > point >>> >>> > that >>> >>> > it stalls (which I failed to explain). Reducing the batch fraction >>> >>> > to a >>> >>> > very >>> >>> > low number like 0.01 saw the iterations progress further, but the >>> >>> > model >>> >>> > failed to converge in that case after a small number of iterations. >>> >>> > >>> >>> > >>> >>> > I also tried reducing the number of records by aggregating on (x,y) >>> >>> > as >>> >>> > the >>> >>> > key (i.e. using aggregations instead of training on every raw >>> >>> > record), >>> >>> > but >>> >>> > encountered by the following exception: >>> >>> > >>> >>> > Loss was due to java.lang.NullPointerException >>> >>> > java.lang.NullPointerException >>> >>> > at >>> >>> > >>> >>> > >>> >>> > org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) >>> >>> > at >>> >>> > >>> >>> > >>> >>> > org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) >>> >>> > at >>> >>> > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> >>> > at >>> >>> > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59) >>> >>> > at >>> >>> > >>> >>> > >>> >>> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) >>> >>> > at >>> >>> > >>> >>> > >>> >>> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) >>> >>> > at >>> >>> > org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) >>> >>> > at >>> >>> > org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) >>> >>> > at >>> >>> > >>> >>> > >>> >>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> >>> > at >>> >>> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>> >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>> >>> > at >>> >>> > >>> >>> > >>> >>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) >>> >>> > at >>> >>> > >>> >>> > >>> >>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >>> >>> > at org.apache.spark.scheduler.Task.run(Task.scala:51) >>> >>> > at >>> >>> > >>> >>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) >>> >>> > at >>> >>> > >>> >>> > >>> >>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> >>> > at >>> >>> > >>> >>> > >>> >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> >>> > at java.lang.Thread.run(Thread.java:745) >>> >>> > >>> >>> > >>> >>> > I'd appreciate any insights/comments about what may be causing the >>> >>> > execution >>> >>> > to stall. >>> >>> > >>> >>> > If logs/tables appear poorly indented in the email, here's a gist >>> >>> > with >>> >>> > relevant details: >>> >>> > https://gist.github.com/reachbach/a418ab2f01b639b624c1 >>> >>> > >>> >>> > Thanks, >>> >>> > Bharath >>> > >>> > >> >> >