Hi Xiangrui,

The issue with aggergating/counting over large feature vectors (as part of
LogisticRegressionWithSGD) continues to exist, but now in another form:
while the execution doesn't freeze (due to SPARK-1112), it now fails at the
second or third gradient descent iteration consistently with an error level
log message, but no stacktrace. I'm running against 1.0.1-rc1, and have
tried setting spark.akka.frameSize as high as 500. When the execution
fails, each of the two executors log the following message (corresponding
to aggregate at GradientDescent.scala:178) :

14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 2 non-empty blocks out of 2 blocks
14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 1 remote fetches in 0 ms
14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 is
5959086
14/07/02 14:09:11 INFO Executor: Sending result for 737 directly to driver
14/07/02 14:09:11 INFO Executor: Finished task ID 737
14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@(slave1,slave2):51941] ->
[akka.tcp://spark@master:59487]
disassociated! Shutting down.


There is no separate stacktrace on the driver side.

Each input record is of the form p1, p2, (p1,p2) where p1, p2 & (p1,p2) are
categorical features with large cardinality, and X is the double label with
a continuous value. The categorical variables are converted to binary
variables which results in a feature vector of size 741092 (composed of all
unique categories across p1, p2 and (p1,p2)). Thus, the labeled point for
input record is a sparse vector of size 741092 with only 3 variables set in
the record. The total number of records is 683233 after aggregating the
input data on (p1, p2). When attempting to train on the unaggregated
records (1337907 in number spread across 455 files), the execution fails at
count, GradientDescent.scala:161 with the following log


(Snipped lines corresponding to other input files)
14/07/02 16:02:03 INFO HadoopRDD: Input split:
file:~/part-r-00012:2834590+2834590
14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-00005:0+2845559
14/07/02 16:02:03 INFO HadoopRDD: Input split:
file:~/part-r-00005:2845559+2845560
14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 is 615
14/07/02 16:02:03 INFO Executor: Sending result for 726 directly to driver
14/07/02 16:02:03 INFO Executor: Finished task ID 726
14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@slave1:48423] -> [akka.tcp://spark@master:55792]
disassociated! Shutting down.

A count() attempted on the input RDD before beginning training has the
following metrics:

Metric            Min        25th    Median    75th     Max

Result
serialization
time            0 ms    0 ms    0 ms    0 ms    0 ms

Duration        33 s    33 s    35 s    35 s    35 s

Time spent
fetching task
results            0 ms    0 ms    0 ms    0 ms    0 ms

Scheduler
delay            0.1 s    0.1 s    0.3 s    0.3 s    0.3 s

Aggregated Metrics by Executor

ID     Address Task             Time Total Failed Succeeded Shuffle Read
    Shuffle Write     Shuf Spill (Mem)     Shuf Spill (Disk)
0     CANNOT FIND ADDRESS     34 s     1     0         1         0.0 B
        0.0 B             0.0 B                 0.0 B
1     CANNOT FIND ADDRESS     36 s     1     0         1         0.0 B
        0.0 B             0.0 B                 0.0 B

Tasks

Task Index    Task ID    Status    Locality Level    Executor    Launch
Time            Duration    GC Time    Result Ser Time    Errors
0     726     SUCCESS         PROCESS_LOCAL     slave1         2014/07/02
16:01:28 35 s         0.1 s
1     727     SUCCESS         PROCESS_LOCAL     slave2         2014/07/02
16:01:28 33 s         99 ms

Any pointers / diagnosis please?



On Thu, Jun 19, 2014 at 10:03 AM, Bharath Ravi Kumar <reachb...@gmail.com>
wrote:

> Thanks. I'll await the fix to re-run my test.
>
>
> On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng <men...@gmail.com> wrote:
>
>> Hi Bharath,
>>
>> This is related to SPARK-1112, which we already found the root cause.
>> I will let you know when this is fixed.
>>
>> Best,
>> Xiangrui
>>
>> On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar <reachb...@gmail.com>
>> wrote:
>> > Couple more points:
>> > 1)The inexplicable stalling of execution with large feature sets appears
>> > similar to that reported with the news-20 dataset:
>> >
>> http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E
>> >
>> > 2) The NPE trying to call mapToPair convert an RDD<Long, Long, Integer,
>> > Integer> into a JavaPairRDD<Tuple2<Long,Long>, Tuple2<Integer,Integer>>
>> is
>> > unrelated to mllib.
>> >
>> > Thanks,
>> > Bharath
>> >
>> >
>> >
>> > On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar <
>> reachb...@gmail.com>
>> > wrote:
>> >>
>> >> Hi  Xiangrui ,
>> >>
>> >> I'm using 1.0.0.
>> >>
>> >> Thanks,
>> >> Bharath
>> >>
>> >> On 18-Jun-2014 1:43 am, "Xiangrui Meng" <men...@gmail.com> wrote:
>> >>>
>> >>> Hi Bharath,
>> >>>
>> >>> Thanks for posting the details! Which Spark version are you using?
>> >>>
>> >>> Best,
>> >>> Xiangrui
>> >>>
>> >>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar <
>> reachb...@gmail.com>
>> >>> wrote:
>> >>> > Hi,
>> >>> >
>> >>> > (Apologies for the long mail, but it's necessary to provide
>> sufficient
>> >>> > details considering the number of issues faced.)
>> >>> >
>> >>> > I'm running into issues testing LogisticRegressionWithSGD a two node
>> >>> > cluster
>> >>> > (each node with 24 cores and 16G available to slaves out of 24G on
>> the
>> >>> > system). Here's a description of the application:
>> >>> >
>> >>> > The model is being trained based on categorical features x, y, and
>> >>> > (x,y).
>> >>> > The categorical features are mapped to binary features by converting
>> >>> > each
>> >>> > distinct value in the category enum into a binary feature by itself
>> >>> > (i.e
>> >>> > presence of that value in a record implies corresponding feature =
>> 1,
>> >>> > else
>> >>> > feature = 0. So, there'd be as many distinct features as enum
>> values) .
>> >>> > The
>> >>> > training vector is laid out as
>> >>> > [x1,x2...xn,y1,y2....yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in
>> the
>> >>> > training data has only one combination (Xk,Yk) and a label
>> appearing in
>> >>> > the
>> >>> > record. Thus, the corresponding labeledpoint sparse vector would
>> only
>> >>> > have 3
>> >>> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the
>> vector
>> >>> > (though parse) would be nearly 614000.  The number of records is
>> about
>> >>> > 1.33
>> >>> > million. The records have been coalesced into 20 partitions across
>> two
>> >>> > nodes. The input data has not been cached.
>> >>> > (NOTE: I do realize the records & features may seem large for a two
>> >>> > node
>> >>> > setup, but given the memory & cpu, and the fact that I'm willing to
>> >>> > give up
>> >>> > some turnaround time, I don't see why tasks should inexplicably
>> fail)
>> >>> >
>> >>> > Additional parameters include:
>> >>> >
>> >>> > spark.executor.memory = 14G
>> >>> > spark.default.parallelism = 1
>> >>> > spark.cores.max=20
>> >>> > spark.storage.memoryFraction=0.8 //No cache space required
>> >>> > (Trying to set spark.akka.frameSize to a larger number, say, 20
>> didn't
>> >>> > help
>> >>> > either)
>> >>> >
>> >>> > The model training was initialized as : new
>> >>> > LogisticRegressionWithSGD(1,
>> >>> > maxIterations, 0.0, 0.05)
>> >>> >
>> >>> > However, after 4 iterations of gradient descent, the entire
>> execution
>> >>> > appeared to stall inexplicably. The corresponding executor details
>> and
>> >>> > details of the stalled stage (number 14) are as follows:
>> >>> >
>> >>> > Metric                        Min        25th     Median    75th
>> >>> > Max
>> >>> > Result serialization time    12 ms    13 ms    14 ms    16 ms    18
>> ms
>> >>> > Duration                    4 s        4 s        5 s        5 s
>> >>> > 5 s
>> >>> > Time spent fetching task     0 ms    0 ms    0 ms    0 ms    0 ms
>> >>> > results
>> >>> > Scheduler delay                6 s        6 s        6 s        6 s
>> >>> > 12 s
>> >>> >
>> >>> >
>> >>> > Stage Id
>> >>> > 14 aggregate at GradientDescent.scala:178
>> >>> >
>> >>> > Task Index    Task ID    Status    Locality Level     Executor
>> >>> > Launch Time                Duration    GC     Result Ser Time
>>  Errors
>> >>> >
>> >>> > Time
>> >>> >
>> >>> > 0     600     RUNNING     PROCESS_LOCAL
>> serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     1.1 h
>> >>> > 1     601     RUNNING     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27         1.1 h
>> >>> > 2     602     RUNNING     PROCESS_LOCAL
>> serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     1.1 h
>> >>> > 3     603     RUNNING     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27         1.1 h
>> >>> > 4     604     RUNNING     PROCESS_LOCAL
>> serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     1.1 h
>> >>> > 5     605     SUCCESS     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27 4 s     2 s     12 ms
>> >>> > 6     606     SUCCESS     PROCESS_LOCAL
>> serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     4 s     1 s     14 ms
>> >>> > 7     607     SUCCESS     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27 4 s     2 s     12 ms
>> >>> > 8     608     SUCCESS     PROCESS_LOCAL
>> serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     5 s     1 s     15 ms
>> >>> > 9     609     SUCCESS     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27 5 s     1 s     14 ms
>> >>> > 10     610     SUCCESS     PROCESS_LOCAL
>> >>> > serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     5 s     1 s     15 ms
>> >>> > 11     611     SUCCESS     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27 4 s     1 s     13 ms
>> >>> > 12     612     SUCCESS     PROCESS_LOCAL
>> >>> > serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     5 s     1 s     18 ms
>> >>> > 13     613     SUCCESS     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27 5 s     1 s     13 ms
>> >>> > 14     614     SUCCESS     PROCESS_LOCAL
>> >>> > serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     4 s     1 s     14 ms
>> >>> > 15     615     SUCCESS     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27 4 s     1 s     12 ms
>> >>> > 16     616     SUCCESS     PROCESS_LOCAL
>> >>> > serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     5 s     1 s     15 ms
>> >>> > 17     617     SUCCESS     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27 5 s     1 s     18 ms
>> >>> > 18     618     SUCCESS     PROCESS_LOCAL
>> >>> > serious.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27     5 s     1 s     16 ms
>> >>> > 19     619     SUCCESS     PROCESS_LOCAL
>> casual.dataone.foo.bar.com
>> >>> > 2014/06/17 10:32:27 4 s     1 s     18 ms
>> >>> >
>> >>> > Executor stats:
>> >>> >
>> >>> > RDD Blocks    Memory Used    Disk Used    Active Tasks    Failed
>> Tasks
>> >>> > Complete Tasks    Total Tasks    Task Time    Shuffle Read
>>  Shuffle
>> >>> > Write
>> >>> > 0     0.0 B / 6.7 GB         0.0 B         2                 0
>> >>> > 307         309         23.2 m         0.0 B             0.0 B
>> >>> > 0     0.0 B / 6.7 GB         0.0 B         3                 0
>> >>> > 308         311         22.4 m         0.0 B             0.0 B
>> >>> >
>> >>> >
>> >>> > Executor jmap output:
>> >>> >
>> >>> > Server compiler detected.
>> >>> > JVM version is 24.55-b03
>> >>> >
>> >>> > using thread-local object allocation.
>> >>> > Parallel GC with 18 thread(s)
>> >>> >
>> >>> > Heap Configuration:
>> >>> >    MinHeapFreeRatio = 40
>> >>> >    MaxHeapFreeRatio = 70
>> >>> >    MaxHeapSize      = 10737418240 (10240.0MB)
>> >>> >    NewSize          = 1310720 (1.25MB)
>> >>> >    MaxNewSize       = 17592186044415 MB
>> >>> >    OldSize          = 5439488 (5.1875MB)
>> >>> >    NewRatio         = 2
>> >>> >    SurvivorRatio    = 8
>> >>> >    PermSize         = 21757952 (20.75MB)
>> >>> >    MaxPermSize      = 134217728 (128.0MB)
>> >>> >    G1HeapRegionSize = 0 (0.0MB)
>> >>> >
>> >>> > Heap Usage:
>> >>> > PS Young Generation
>> >>> > Eden Space:
>> >>> >    capacity = 2783969280 (2655.0MB)
>> >>> >    used     = 192583816 (183.66223907470703MB)
>> >>> >    free     = 2591385464 (2471.337760925293MB)
>> >>> >    6.917598458557704% used
>> >>> > From Space:
>> >>> >    capacity = 409993216 (391.0MB)
>> >>> >    used     = 1179808 (1.125152587890625MB)
>> >>> >    free     = 408813408 (389.8748474121094MB)
>> >>> >    0.2877628102022059% used
>> >>> > To Space:
>> >>> >    capacity = 385351680 (367.5MB)
>> >>> >    used     = 0 (0.0MB)
>> >>> >    free     = 385351680 (367.5MB)
>> >>> >    0.0% used
>> >>> > PS Old Generation
>> >>> >    capacity = 7158628352 (6827.0MB)
>> >>> >    used     = 4455093024 (4248.707794189453MB)
>> >>> >    free     = 2703535328 (2578.292205810547MB)
>> >>> >    62.2338918146983% used
>> >>> > PS Perm Generation
>> >>> >    capacity = 90701824 (86.5MB)
>> >>> >    used     = 45348832 (43.248016357421875MB)
>> >>> >    free     = 45352992 (43.251983642578125MB)
>> >>> >    49.99770677158598% used
>> >>> >
>> >>> > 8432 interned Strings occupying 714672 bytes.
>> >>> >
>> >>> >
>> >>> > Executor GC log snippet:
>> >>> >
>> >>> > 168.778: [GC [PSYoungGen: 2702831K->578545K(2916864K)]
>> >>> > 9302453K->7460857K(9907712K), 0.3193550 secs] [Times: user=5.13
>> >>> > sys=0.39,
>> >>> > real=0.32 secs]
>> >>> > 169.097: [Full GC [PSYoungGen: 578545K->0K(2916864K)] [ParOldGen:
>> >>> > 6882312K->1073297K(6990848K)] 7460857K->1073297K(9907712K)
>> [PSPermGen:
>> >>> > 44248K->44201K(88576K)], 4.5521090 secs] [Times: user=24.22
>> sys=0.18,
>> >>> > real=4.55 secs]
>> >>> > 174.207: [GC [PSYoungGen: 2338304K->81315K(2544128K)]
>> >>> > 3411653K->1154665K(9534976K), 0.0966280 secs] [Times: user=1.66
>> >>> > sys=0.00,
>> >>> > real=0.09 secs]
>> >>> >
>> >>> > I tried to map partitions to cores on the nodes. Increasing the
>> number
>> >>> > of
>> >>> > partitions (say to 80 or 100) would result in progress till the 6th
>> >>> > iteration or so, but the next stage would stall as before with
>> apparent
>> >>> > root
>> >>> > cause / logs. With increased partitions, the last stage that
>> completed
>> >>> > had
>> >>> > the following task times:
>> >>> >
>> >>> > Metric                        Min        25th     Median    75th
>>  Max
>> >>> > Result serialization time    11 ms    12 ms    13 ms    15 ms
>>  0.4 s
>> >>> > Duration                    0.5 s    0.9 s    1 s        3 s
>>  7 s
>> >>> > Time spent fetching            0 ms    0 ms    0 ms    0 ms    0 ms
>> >>> > task results
>> >>> > Scheduler delay                5 s        6 s        6 s        7 s
>> >>> > 12 s
>> >>> >
>> >>> > My hypothesis is that as the coefficient array becomes less sparse
>> >>> > (with
>> >>> > successive iterations), the cost of the aggregate goes up to the
>> point
>> >>> > that
>> >>> > it stalls (which I failed to explain). Reducing the batch fraction
>> to a
>> >>> > very
>> >>> > low number like 0.01 saw the iterations progress further, but the
>> model
>> >>> > failed to converge in that case after a small number of iterations.
>> >>> >
>> >>> >
>> >>> > I also tried reducing the number of records by aggregating on (x,y)
>> as
>> >>> > the
>> >>> > key (i.e. using aggregations instead of training on every raw
>> record),
>> >>> > but
>> >>> > encountered by the following exception:
>> >>> >
>> >>> > Loss was due to java.lang.NullPointerException
>> >>> > java.lang.NullPointerException
>> >>> >         at
>> >>> >
>> >>> >
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
>> >>> >         at
>> >>> >
>> >>> >
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
>> >>> >         at
>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> >>> >         at
>> >>> > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
>> >>> >         at
>> >>> >
>> >>> >
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
>> >>> >         at
>> >>> >
>> >>> >
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
>> >>> >         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
>> >>> >         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
>> >>> >         at
>> >>> >
>> >>> >
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> >>> >         at
>> >>> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> >>> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> >>> >         at
>> >>> >
>> >>> >
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>> >>> >         at
>> >>> >
>> >>> >
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>> >>> >         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>> >>> >         at
>> >>> >
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>> >>> >         at
>> >>> >
>> >>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>> >         at
>> >>> >
>> >>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>> >         at java.lang.Thread.run(Thread.java:745)
>> >>> >
>> >>> >
>> >>> > I'd appreciate any insights/comments about what may be causing the
>> >>> > execution
>> >>> > to stall.
>> >>> >
>> >>> > If logs/tables appear poorly indented in the email, here's a gist
>> with
>> >>> > relevant details:
>> >>> > https://gist.github.com/reachbach/a418ab2f01b639b624c1
>> >>> >
>> >>> > Thanks,
>> >>> > Bharath
>> >
>> >
>>
>
>

Reply via email to