Hi Xiangrui, The issue with aggergating/counting over large feature vectors (as part of LogisticRegressionWithSGD) continues to exist, but now in another form: while the execution doesn't freeze (due to SPARK-1112), it now fails at the second or third gradient descent iteration consistently with an error level log message, but no stacktrace. I'm running against 1.0.1-rc1, and have tried setting spark.akka.frameSize as high as 500. When the execution fails, each of the two executors log the following message (corresponding to aggregate at GradientDescent.scala:178) :
14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 1 remote fetches in 0 ms 14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 is 5959086 14/07/02 14:09:11 INFO Executor: Sending result for 737 directly to driver 14/07/02 14:09:11 INFO Executor: Finished task ID 737 14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@(slave1,slave2):51941] -> [akka.tcp://spark@master:59487] disassociated! Shutting down. There is no separate stacktrace on the driver side. Each input record is of the form p1, p2, (p1,p2) where p1, p2 & (p1,p2) are categorical features with large cardinality, and X is the double label with a continuous value. The categorical variables are converted to binary variables which results in a feature vector of size 741092 (composed of all unique categories across p1, p2 and (p1,p2)). Thus, the labeled point for input record is a sparse vector of size 741092 with only 3 variables set in the record. The total number of records is 683233 after aggregating the input data on (p1, p2). When attempting to train on the unaggregated records (1337907 in number spread across 455 files), the execution fails at count, GradientDescent.scala:161 with the following log (Snipped lines corresponding to other input files) 14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-00012:2834590+2834590 14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-00005:0+2845559 14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-00005:2845559+2845560 14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 is 615 14/07/02 16:02:03 INFO Executor: Sending result for 726 directly to driver 14/07/02 16:02:03 INFO Executor: Finished task ID 726 14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@slave1:48423] -> [akka.tcp://spark@master:55792] disassociated! Shutting down. A count() attempted on the input RDD before beginning training has the following metrics: Metric Min 25th Median 75th Max Result serialization time 0 ms 0 ms 0 ms 0 ms 0 ms Duration 33 s 33 s 35 s 35 s 35 s Time spent fetching task results 0 ms 0 ms 0 ms 0 ms 0 ms Scheduler delay 0.1 s 0.1 s 0.3 s 0.3 s 0.3 s Aggregated Metrics by Executor ID Address Task Time Total Failed Succeeded Shuffle Read Shuffle Write Shuf Spill (Mem) Shuf Spill (Disk) 0 CANNOT FIND ADDRESS 34 s 1 0 1 0.0 B 0.0 B 0.0 B 0.0 B 1 CANNOT FIND ADDRESS 36 s 1 0 1 0.0 B 0.0 B 0.0 B 0.0 B Tasks Task Index Task ID Status Locality Level Executor Launch Time Duration GC Time Result Ser Time Errors 0 726 SUCCESS PROCESS_LOCAL slave1 2014/07/02 16:01:28 35 s 0.1 s 1 727 SUCCESS PROCESS_LOCAL slave2 2014/07/02 16:01:28 33 s 99 ms Any pointers / diagnosis please? On Thu, Jun 19, 2014 at 10:03 AM, Bharath Ravi Kumar <reachb...@gmail.com> wrote: > Thanks. I'll await the fix to re-run my test. > > > On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng <men...@gmail.com> wrote: > >> Hi Bharath, >> >> This is related to SPARK-1112, which we already found the root cause. >> I will let you know when this is fixed. >> >> Best, >> Xiangrui >> >> On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar <reachb...@gmail.com> >> wrote: >> > Couple more points: >> > 1)The inexplicable stalling of execution with large feature sets appears >> > similar to that reported with the news-20 dataset: >> > >> http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E >> > >> > 2) The NPE trying to call mapToPair convert an RDD<Long, Long, Integer, >> > Integer> into a JavaPairRDD<Tuple2<Long,Long>, Tuple2<Integer,Integer>> >> is >> > unrelated to mllib. >> > >> > Thanks, >> > Bharath >> > >> > >> > >> > On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar < >> reachb...@gmail.com> >> > wrote: >> >> >> >> Hi Xiangrui , >> >> >> >> I'm using 1.0.0. >> >> >> >> Thanks, >> >> Bharath >> >> >> >> On 18-Jun-2014 1:43 am, "Xiangrui Meng" <men...@gmail.com> wrote: >> >>> >> >>> Hi Bharath, >> >>> >> >>> Thanks for posting the details! Which Spark version are you using? >> >>> >> >>> Best, >> >>> Xiangrui >> >>> >> >>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar < >> reachb...@gmail.com> >> >>> wrote: >> >>> > Hi, >> >>> > >> >>> > (Apologies for the long mail, but it's necessary to provide >> sufficient >> >>> > details considering the number of issues faced.) >> >>> > >> >>> > I'm running into issues testing LogisticRegressionWithSGD a two node >> >>> > cluster >> >>> > (each node with 24 cores and 16G available to slaves out of 24G on >> the >> >>> > system). Here's a description of the application: >> >>> > >> >>> > The model is being trained based on categorical features x, y, and >> >>> > (x,y). >> >>> > The categorical features are mapped to binary features by converting >> >>> > each >> >>> > distinct value in the category enum into a binary feature by itself >> >>> > (i.e >> >>> > presence of that value in a record implies corresponding feature = >> 1, >> >>> > else >> >>> > feature = 0. So, there'd be as many distinct features as enum >> values) . >> >>> > The >> >>> > training vector is laid out as >> >>> > [x1,x2...xn,y1,y2....yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in >> the >> >>> > training data has only one combination (Xk,Yk) and a label >> appearing in >> >>> > the >> >>> > record. Thus, the corresponding labeledpoint sparse vector would >> only >> >>> > have 3 >> >>> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the >> vector >> >>> > (though parse) would be nearly 614000. The number of records is >> about >> >>> > 1.33 >> >>> > million. The records have been coalesced into 20 partitions across >> two >> >>> > nodes. The input data has not been cached. >> >>> > (NOTE: I do realize the records & features may seem large for a two >> >>> > node >> >>> > setup, but given the memory & cpu, and the fact that I'm willing to >> >>> > give up >> >>> > some turnaround time, I don't see why tasks should inexplicably >> fail) >> >>> > >> >>> > Additional parameters include: >> >>> > >> >>> > spark.executor.memory = 14G >> >>> > spark.default.parallelism = 1 >> >>> > spark.cores.max=20 >> >>> > spark.storage.memoryFraction=0.8 //No cache space required >> >>> > (Trying to set spark.akka.frameSize to a larger number, say, 20 >> didn't >> >>> > help >> >>> > either) >> >>> > >> >>> > The model training was initialized as : new >> >>> > LogisticRegressionWithSGD(1, >> >>> > maxIterations, 0.0, 0.05) >> >>> > >> >>> > However, after 4 iterations of gradient descent, the entire >> execution >> >>> > appeared to stall inexplicably. The corresponding executor details >> and >> >>> > details of the stalled stage (number 14) are as follows: >> >>> > >> >>> > Metric Min 25th Median 75th >> >>> > Max >> >>> > Result serialization time 12 ms 13 ms 14 ms 16 ms 18 >> ms >> >>> > Duration 4 s 4 s 5 s 5 s >> >>> > 5 s >> >>> > Time spent fetching task 0 ms 0 ms 0 ms 0 ms 0 ms >> >>> > results >> >>> > Scheduler delay 6 s 6 s 6 s 6 s >> >>> > 12 s >> >>> > >> >>> > >> >>> > Stage Id >> >>> > 14 aggregate at GradientDescent.scala:178 >> >>> > >> >>> > Task Index Task ID Status Locality Level Executor >> >>> > Launch Time Duration GC Result Ser Time >> Errors >> >>> > >> >>> > Time >> >>> > >> >>> > 0 600 RUNNING PROCESS_LOCAL >> serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 1.1 h >> >>> > 1 601 RUNNING PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 1.1 h >> >>> > 2 602 RUNNING PROCESS_LOCAL >> serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 1.1 h >> >>> > 3 603 RUNNING PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 1.1 h >> >>> > 4 604 RUNNING PROCESS_LOCAL >> serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 1.1 h >> >>> > 5 605 SUCCESS PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 4 s 2 s 12 ms >> >>> > 6 606 SUCCESS PROCESS_LOCAL >> serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 4 s 1 s 14 ms >> >>> > 7 607 SUCCESS PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 4 s 2 s 12 ms >> >>> > 8 608 SUCCESS PROCESS_LOCAL >> serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >>> > 9 609 SUCCESS PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 5 s 1 s 14 ms >> >>> > 10 610 SUCCESS PROCESS_LOCAL >> >>> > serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >>> > 11 611 SUCCESS PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 4 s 1 s 13 ms >> >>> > 12 612 SUCCESS PROCESS_LOCAL >> >>> > serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 5 s 1 s 18 ms >> >>> > 13 613 SUCCESS PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 5 s 1 s 13 ms >> >>> > 14 614 SUCCESS PROCESS_LOCAL >> >>> > serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 4 s 1 s 14 ms >> >>> > 15 615 SUCCESS PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 4 s 1 s 12 ms >> >>> > 16 616 SUCCESS PROCESS_LOCAL >> >>> > serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >>> > 17 617 SUCCESS PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 5 s 1 s 18 ms >> >>> > 18 618 SUCCESS PROCESS_LOCAL >> >>> > serious.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 5 s 1 s 16 ms >> >>> > 19 619 SUCCESS PROCESS_LOCAL >> casual.dataone.foo.bar.com >> >>> > 2014/06/17 10:32:27 4 s 1 s 18 ms >> >>> > >> >>> > Executor stats: >> >>> > >> >>> > RDD Blocks Memory Used Disk Used Active Tasks Failed >> Tasks >> >>> > Complete Tasks Total Tasks Task Time Shuffle Read >> Shuffle >> >>> > Write >> >>> > 0 0.0 B / 6.7 GB 0.0 B 2 0 >> >>> > 307 309 23.2 m 0.0 B 0.0 B >> >>> > 0 0.0 B / 6.7 GB 0.0 B 3 0 >> >>> > 308 311 22.4 m 0.0 B 0.0 B >> >>> > >> >>> > >> >>> > Executor jmap output: >> >>> > >> >>> > Server compiler detected. >> >>> > JVM version is 24.55-b03 >> >>> > >> >>> > using thread-local object allocation. >> >>> > Parallel GC with 18 thread(s) >> >>> > >> >>> > Heap Configuration: >> >>> > MinHeapFreeRatio = 40 >> >>> > MaxHeapFreeRatio = 70 >> >>> > MaxHeapSize = 10737418240 (10240.0MB) >> >>> > NewSize = 1310720 (1.25MB) >> >>> > MaxNewSize = 17592186044415 MB >> >>> > OldSize = 5439488 (5.1875MB) >> >>> > NewRatio = 2 >> >>> > SurvivorRatio = 8 >> >>> > PermSize = 21757952 (20.75MB) >> >>> > MaxPermSize = 134217728 (128.0MB) >> >>> > G1HeapRegionSize = 0 (0.0MB) >> >>> > >> >>> > Heap Usage: >> >>> > PS Young Generation >> >>> > Eden Space: >> >>> > capacity = 2783969280 (2655.0MB) >> >>> > used = 192583816 (183.66223907470703MB) >> >>> > free = 2591385464 (2471.337760925293MB) >> >>> > 6.917598458557704% used >> >>> > From Space: >> >>> > capacity = 409993216 (391.0MB) >> >>> > used = 1179808 (1.125152587890625MB) >> >>> > free = 408813408 (389.8748474121094MB) >> >>> > 0.2877628102022059% used >> >>> > To Space: >> >>> > capacity = 385351680 (367.5MB) >> >>> > used = 0 (0.0MB) >> >>> > free = 385351680 (367.5MB) >> >>> > 0.0% used >> >>> > PS Old Generation >> >>> > capacity = 7158628352 (6827.0MB) >> >>> > used = 4455093024 (4248.707794189453MB) >> >>> > free = 2703535328 (2578.292205810547MB) >> >>> > 62.2338918146983% used >> >>> > PS Perm Generation >> >>> > capacity = 90701824 (86.5MB) >> >>> > used = 45348832 (43.248016357421875MB) >> >>> > free = 45352992 (43.251983642578125MB) >> >>> > 49.99770677158598% used >> >>> > >> >>> > 8432 interned Strings occupying 714672 bytes. >> >>> > >> >>> > >> >>> > Executor GC log snippet: >> >>> > >> >>> > 168.778: [GC [PSYoungGen: 2702831K->578545K(2916864K)] >> >>> > 9302453K->7460857K(9907712K), 0.3193550 secs] [Times: user=5.13 >> >>> > sys=0.39, >> >>> > real=0.32 secs] >> >>> > 169.097: [Full GC [PSYoungGen: 578545K->0K(2916864K)] [ParOldGen: >> >>> > 6882312K->1073297K(6990848K)] 7460857K->1073297K(9907712K) >> [PSPermGen: >> >>> > 44248K->44201K(88576K)], 4.5521090 secs] [Times: user=24.22 >> sys=0.18, >> >>> > real=4.55 secs] >> >>> > 174.207: [GC [PSYoungGen: 2338304K->81315K(2544128K)] >> >>> > 3411653K->1154665K(9534976K), 0.0966280 secs] [Times: user=1.66 >> >>> > sys=0.00, >> >>> > real=0.09 secs] >> >>> > >> >>> > I tried to map partitions to cores on the nodes. Increasing the >> number >> >>> > of >> >>> > partitions (say to 80 or 100) would result in progress till the 6th >> >>> > iteration or so, but the next stage would stall as before with >> apparent >> >>> > root >> >>> > cause / logs. With increased partitions, the last stage that >> completed >> >>> > had >> >>> > the following task times: >> >>> > >> >>> > Metric Min 25th Median 75th >> Max >> >>> > Result serialization time 11 ms 12 ms 13 ms 15 ms >> 0.4 s >> >>> > Duration 0.5 s 0.9 s 1 s 3 s >> 7 s >> >>> > Time spent fetching 0 ms 0 ms 0 ms 0 ms 0 ms >> >>> > task results >> >>> > Scheduler delay 5 s 6 s 6 s 7 s >> >>> > 12 s >> >>> > >> >>> > My hypothesis is that as the coefficient array becomes less sparse >> >>> > (with >> >>> > successive iterations), the cost of the aggregate goes up to the >> point >> >>> > that >> >>> > it stalls (which I failed to explain). Reducing the batch fraction >> to a >> >>> > very >> >>> > low number like 0.01 saw the iterations progress further, but the >> model >> >>> > failed to converge in that case after a small number of iterations. >> >>> > >> >>> > >> >>> > I also tried reducing the number of records by aggregating on (x,y) >> as >> >>> > the >> >>> > key (i.e. using aggregations instead of training on every raw >> record), >> >>> > but >> >>> > encountered by the following exception: >> >>> > >> >>> > Loss was due to java.lang.NullPointerException >> >>> > java.lang.NullPointerException >> >>> > at >> >>> > >> >>> > >> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) >> >>> > at >> >>> > >> >>> > >> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) >> >>> > at >> scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> >>> > at >> >>> > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59) >> >>> > at >> >>> > >> >>> > >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) >> >>> > at >> >>> > >> >>> > >> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) >> >>> > at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) >> >>> > at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) >> >>> > at >> >>> > >> >>> > >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> >>> > at >> >>> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >>> > at >> >>> > >> >>> > >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) >> >>> > at >> >>> > >> >>> > >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >> >>> > at org.apache.spark.scheduler.Task.run(Task.scala:51) >> >>> > at >> >>> > >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) >> >>> > at >> >>> > >> >>> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >>> > at >> >>> > >> >>> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >>> > at java.lang.Thread.run(Thread.java:745) >> >>> > >> >>> > >> >>> > I'd appreciate any insights/comments about what may be causing the >> >>> > execution >> >>> > to stall. >> >>> > >> >>> > If logs/tables appear poorly indented in the email, here's a gist >> with >> >>> > relevant details: >> >>> > https://gist.github.com/reachbach/a418ab2f01b639b624c1 >> >>> > >> >>> > Thanks, >> >>> > Bharath >> > >> > >> > >