Encounter 'Could not find or load main class' error when submitting spark job on kubernetes

2018-05-22 Thread Makoto Hashimoto
Hi,

I am trying to run spark job on kubernetes. Using local spark job works

fine as follows:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi
--master local[4] examples/jars/spark-examples_2.11-2.3.0.jar 100
..
2018-05-20 21:49:02 INFO  DAGScheduler:54 - Job 0 finished: reduce at
SparkPi.scala:38, took 2.459637 s
Pi is roughly 3.1418607141860715
2018-05-20 21:49:02 INFO  AbstractConnector:318 - Stopped
Spark@41bb8c78{HTTP/1.1,[http/1.1]}{localhost:4040}
2018-05-20 21:49:02 INFO  SparkUI:54 - Stopped Spark web UI at
http://localhost:4040
2018-05-20 21:49:02 INFO  MapOutputTrackerMasterEndpoint:54 -
MapOutputTrackerMasterEndpoint stopped!
2018-05-20 21:49:02 INFO  MemoryStore:54 - MemoryStore cleared
2018-05-20 21:49:02 INFO  BlockManager:54 - BlockManager stopped
2018-05-20 21:49:02 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-05-20 21:49:02 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 -
OutputCommitCoordinator stopped!
2018-05-20 21:49:02 INFO  SparkContext:54 - Successfully stopped SparkContext
2018-05-20 21:49:02 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-05-20 21:49:02 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-ad68e56c-7991-4c6c-b3c5-99ab481a1449
2018-05-20 21:49:02 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-bbcce77f-70a4-4ec1-ad05-e8819fd3ba7a

When I submitted spark job on kubernetes, it ended with error.

$ bin/spark-submit --master k8s://https://192.168.99.100:8443
--deploy-mode cluster --name spark-pi --class
org.apache.spark.examples.SparkPi --conf spark.executor.instances=5
 --conf spark.kubernetes.container.image=tokoma1/spark:1.0 --conf
spark.kubernetes.driver.pod.name=spark-pi-driver
local:///usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
100
...
 Container name: spark-kubernetes-driver
 Container image: tokoma1/spark:1.0
 Container state: Terminated
 Exit code: 1
2018-05-20 21:59:02 INFO  Client:54 - Application spark-pi finished.
2018-05-20 21:59:02 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-05-20 21:59:02 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-485f73a5-7416-4caa-acb2-49b0bde5eb80

I checked the status of the pod as follows:

$ kubectl get pods

NAME  READY STATUSRESTARTS   AGE
spark-pi-driver   0/1   Error 0  1m

This means it ended with an error.

I checked log.

$ kubectl -n=default logs -f spark-pi-driver++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS
$SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java
-Dspark.driver.port=7078
-Dspark.master=k8s://https://192.168.99.100:8443
-Dspark.kubernetes.driver.pod.name=spark-pi-driver
-Dspark.driver.blockManager.port=7079
-Dspark.kubernetes.container.image=tokoma1/spark:1.0
-Dspark.jars=/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar,/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
-Dspark.app.name=spark-pi
-Dspark.app.id=spark-9762ba052680404a9220f451d99ba818
-Dspark.submit.deployMode=cluster -Dspark.executor.instances=5
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-01f873a813323a4a85eb7a2464949141
-Dspark.driver.host=spark-pi-01f873a813323a4a85eb7a2464949141-driver-svc.default.svc
-cp 
':/opt/spark/jars/*:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar'
-Xms1g -Xmx1g -Dspark.driver.bindAddress=172.17.0.4
org.apache.spark.examples.SparkPi 100
Error: Could not find or load main class org.apache.spark.examples.SparkPi

Does anybody encountered the same error as I experienced and know the
resolution ?

Thanks,


Re: learning Spark

2017-12-05 Thread makoto
This gitbook explains Spark compotents in detail.

'Mastering Apache Spark 2'

https://www.gitbook.com/book/jaceklaskowski/mastering-apache-spark/details




2017-12-04 12:48 GMT+09:00 Manuel Sopena Ballesteros <
manuel...@garvan.org.au>:

> Dear Spark community,
>
>
>
> Is there any resource (books, online course, etc.) available that you know
> of to learn about spark? I am interested in the sys admin side of it? like
> the different parts inside spark, how spark works internally, best ways to
> install/deploy/monitor and how to get best performance possible.
>
>
>
> Any suggestion?
>
>
>
> Thank you very much
>
>
>
> *Manuel Sopena Ballesteros *| Systems Engineer
> *Garvan Institute of Medical Research *
> The Kinghorn Cancer Centre, 370 Victoria Street, Darlinghurst, NSW 2010
> 
> *T:* + 61 (0)2 9355 5760 <+61%202%209355%205760> | *F:* +61 (0)2 9295 8507
> <+61%202%209295%208507> | *E:* manuel...@garvan.org.au
>
>
> NOTICE
> Please consider the environment before printing this email. This message
> and any attachments are intended for the addressee named and may contain
> legally privileged/confidential/copyright information. If you are not the
> intended recipient, you should not read, use, disclose, copy or distribute
> this communication. If you have received this message in error please
> notify us at once by return email and then delete both messages. We accept
> no liability for the distribution of viruses or similar in electronic
> communications. This notice should not be removed.
>


Re: pyspark configuration with Juyter

2017-11-04 Thread makoto
I setup environment variables in my ~/.bashrc as follows:

export PYSPARK_PYTHON=/usr/local/oss/anaconda3/bin/python3.6
export PYTHONPATH=$(ls -a
${SPARK_HOME}/python/lib/py4j-*-src.zip):${SPARK_HOME}/python:$PYTHONPATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'


2017-11-03 20:56 GMT+09:00 Jeff Zhang :

>
> You are setting PYSPARK_DRIVER to jupyter, please set it to python exec
> file
>
>
> anudeep 于2017年11月3日周五 下午7:31写道:
>
>> Hello experts,
>>
>> I install jupyter notebook thorugh anacoda, set the pyspark driver to use
>> jupyter notebook.
>>
>> I see the below issue when i try to open pyspark.
>>
>> anudeepg@datanode2 spark-2.1.0]$ ./bin/pyspark
>> [I 07:29:53.184 NotebookApp] The port  is already in use, trying
>> another port.
>> [I 07:29:53.211 NotebookApp] JupyterLab alpha preview extension loaded
>> from /home/anudeepg/anaconda2/lib/python2.7/site-packages/jupyterlab
>> JupyterLab v0.27.0
>> Known labextensions:
>> [I 07:29:53.212 NotebookApp] Running the core application with no
>> additional extensions or settings
>> [I 07:29:53.214 NotebookApp] Serving notebooks from local directory:
>> /opt/mapr/spark/spark-2.1.0
>> [I 07:29:53.214 NotebookApp] 0 active kernels
>> [I 07:29:53.214 NotebookApp] The Jupyter Notebook is running at:
>> http://localhost:8889/?token=9aa5dc87cb5a6d987237f68e2f0b7e
>> 9c70a7f2e8c9a7cf2e
>> [I 07:29:53.214 NotebookApp] Use Control-C to stop this server and shut
>> down all kernels (twice to skip confirmation).
>> [W 07:29:53.214 NotebookApp] No web browser found: could not locate
>> runnable browser.
>> [C 07:29:53.214 NotebookApp]
>>
>> Copy/paste this URL into your browser when you connect for the first
>> time,
>> to login with a token:
>> http://localhost:8889/?token=9aa5dc87cb5a6d987237f68e2f0b7e
>> 9c70a7f2e8c9a7cf2e
>>
>>
>> Can someone please help me here.
>>
>> Thanks!
>> Anudeep
>>
>>


Re: Fwd: Dose pyspark supports python3.6?

2017-11-01 Thread makoto
I'm not sure whether pyspark supports python 3.6 but  pyspark and python
3.6 is working on my environment.

I found the following issue and it seems to be already resolved.

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-19019

2017/11/02 午前11:54 "Jun Shi" :



Dear spark developers:
   It’s so exciting to send this email to you.
   I have encountered the problem that if pyspark supports python3.6?
(I found some answer online is no.) Can you tell me the answer which
 python versions does pyspark support?
   I’m looking forward for your answer. Thank you very much!

Best,
Jun


count exceed int.MaxValue

2017-08-08 Thread makoto
Hello,
I'd like to count more than Int.MaxValue. But I encountered the following
error.

scala> val rdd = sc.parallelize(1L to Int.MaxValue*2.toLong)
rdd: org.apache.spark.rdd.RDD[Long] = ParallelCollectionRDD[28] at
parallelize at :24

scala> rdd.count
java.lang.IllegalArgumentException: More than Int.MaxValue elements.
  at
scala.collection.immutable.NumericRange$.check$1(NumericRange.scala:304)
  at scala.collection.immutable.NumericRange$.count(NumericRange.scala:314)
  at
scala.collection.immutable.NumericRange.numRangeElements$lzycompute(NumericRange.scala:52)
  at
scala.collection.immutable.NumericRange.numRangeElements(NumericRange.scala:51)
  at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
  at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:145)
  at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
  ... 48 elided

How can I avoid the error ?
A similar problem is as follows:
scala> rdd.reduce((a,b)=> (a + b))
java.lang.IllegalArgumentException: More than Int.MaxValue elements.
  at
scala.collection.immutable.NumericRange$.check$1(NumericRange.scala:304)
  at scala.collection.immutable.NumericRange$.count(NumericRange.scala:314)
  at
scala.collection.immutable.NumericRange.numRangeElements$lzycompute(NumericRange.scala:52)
  at
scala.collection.immutable.NumericRange.numRangeElements(NumericRange.scala:51)
  at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
  at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:145)
  at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
  ... 48 elided


Re: akka disassociated on GC

2014-07-22 Thread Makoto Yui

Hi Xiangrui,

By using your treeAggregate and broadcast patch, the evaluation has been 
processed successfully.


I expect that these patches are merged in the next major release 
(v1.1?). Without them, it would be hard to use mllib for a large dataset.


Thanks,
Makoto

(2014/07/16 15:05), Xiangrui Meng wrote:

Hi Makoto,

I don't remember I wrote that but thanks for bringing this issue up!
There are two important settings to check: 1) driver memory (you can
see it from the executor tab), 2) number of partitions (try to use
small number of partitions). I put two PRs to fix the problem:

1) use broadcast in task closure: https://github.com/apache/spark/pull/1427
2) use treeAggregate to get the result:
https://github.com/apache/spark/pull/1110

They are still under review. Once merged, the problem should be fixed.
I will test the KDDB dataset and report back. Thanks!

Best,
Xiangrui

On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui yuin...@gmail.com wrote:

Hello,

(2014/06/19 23:43), Xiangrui Meng wrote:


The execution was slow for more large KDD cup 2012, Track 2 dataset
(235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the
sequential aggregation of dense vectors on a single driver node.

It took about 7.6m for aggregation for an iteration.



When running the above test, I got another error at the beginning of the 2nd
iteration when enabling iterations.

It works fine for the first iteration but the 2nd iteration always fails.

It seems that akka connections are suddenly disassociated when GC happens on
the driver node. Two possible causes can be considered:
1) The driver is under a heavy load because of GC; so executors cannot
connect to the driver. Changing akka timeout setting did not resolve the
issue.
2) akka oddly released valid connections on GC.

I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve
the problem.

[spark-defaults.conf]
spark.akka.frameSize 50
spark.akka.timeout   120
spark.akka.askTimeout120
spark.akka.lookupTimeout 120
spark.akka.heartbeat.pauses 600

It seems this issue is related to one previously discussed in
http://markmail.org/message/p2i34frtf4iusdfn

Are there any preferred configurations or workaround for this issue?

Thanks,
Makoto


[The error log of the driver]

14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as
25300254 bytes in 35 ms
666.108: [GC [PSYoungGen: 6540914K-975362K(7046784K)]
12419091K-7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43,
real=5.22 secs]
14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20140714180032-0010/8 removed: Command exited with code 1
14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
672.596: [GC [PSYoungGen: 6642785K-359202K(6059072K)]
13459952K-8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72,
real=2.83 secs]
14/07/14 18:11:41 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc02.mydomain.org,54538)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc18.mydomain.org,58100)

The full log is uploaded on
https://dl.dropboxusercontent.com/u/13123103/driver.log


[The error log of a worker]
14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
finished with state EXITED message Command exited with code 1 exitStatus 1
14/07/14 18:11:38 INFO actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports

Re: akka disassociated on GC

2014-07-16 Thread Makoto Yui

Hi Xiangrui,

(2014/07/16 15:05), Xiangrui Meng wrote:

I don't remember I wrote that but thanks for bringing this issue up!
There are two important settings to check: 1) driver memory (you can
see it from the executor tab), 2) number of partitions (try to use
small number of partitions). I put two PRs to fix the problem:


For the driver memory, I used 16GB/24GB and it was enough for the 
execution (full GC was not happen). I check it by using jmap and top 
command.


BTW, I was faced that the required memory for driver was oddly 
proportional to # of tasks/executors. When I used 8GB for the driver 
memory, I got OOM in the task serialization. It could be considered as a 
possible memory leak in the task serialization to be addressed in the 
future.


Each task size is about 24MB and # of tasks/executors is 280.
The size of each task result was about 120MB or so.

 1) use broadcast in task closure: 
https://github.com/apache/spark/pull/1427


Does this PR reduce the required memory for the driver?

Is there a big difference in explicit broadcast of feature weights and 
implicit task serialization including feature weights?


 2) use treeAggregate to get the result:
 https://github.com/apache/spark/pull/1110

treeAggregate would reduce the time for aggregation and the required 
memory of a driver for sure. I would test it.


However, the problem that I am facing now is an akka connection issue on 
GC, or under heavy loads. And thus, I think the problem is lurking 
behind even though the consumed memory size is reduced by treeAggregate.


Best,
Makoto


akka disassociated on GC

2014-07-15 Thread Makoto Yui

Hello,

(2014/06/19 23:43), Xiangrui Meng wrote:

The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ 
records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the sequential 
aggregation of dense vectors on a single driver node.

It took about 7.6m for aggregation for an iteration.


When running the above test, I got another error at the beginning of the 
2nd iteration when enabling iterations.


It works fine for the first iteration but the 2nd iteration always fails.

It seems that akka connections are suddenly disassociated when GC 
happens on the driver node. Two possible causes can be considered:
1) The driver is under a heavy load because of GC; so executors cannot 
connect to the driver. Changing akka timeout setting did not resolve the 
issue.

2) akka oddly released valid connections on GC.

I'm using spark 1.0.1 and timeout setting of akka as follows did not 
resolve the problem.


[spark-defaults.conf]
spark.akka.frameSize 50
spark.akka.timeout   120
spark.akka.askTimeout120
spark.akka.lookupTimeout 120
spark.akka.heartbeat.pauses 600

It seems this issue is related to one previously discussed in
http://markmail.org/message/p2i34frtf4iusdfn

Are there any preferred configurations or workaround for this issue?

Thanks,
Makoto


[The error log of the driver]

14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 
as 25300254 bytes in 35 ms
666.108: [GC [PSYoungGen: 6540914K-975362K(7046784K)] 
12419091K-7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 
sys=68.43, real=5.22 secs]
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated: 
app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding 
SendingConnectionManagerId not found
14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor 
app-20140714180032-0010/8 removed: Command exited with code 1
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding 
SendingConnectionManagerId not found
672.596: [GC [PSYoungGen: 6642785K-359202K(6059072K)] 
13459952K-8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 
sys=33.72, real=2.83 secs]
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc02.mydomain.org,54538)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing 
SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100)


The full log is uploaded on
https://dl.dropboxusercontent.com/u/13123103/driver.log


[The error log of a worker]
14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8 
finished with state EXITED message Command exited with code 1 exitStatus 1
14/07/14 18:11:38 INFO actor.LocalActorRef: Message 
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] 
from Actor[akka://sparkWorker/deadLetters] to 
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.0.1.9%3A60601-39#1322474303] 
was not delivered. [13] dead letters encountered. This logging can be 
turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14/07/14 18:11:38 ERROR remote.EndpointWriter: AssociationError 
[akka.tcp://sparkwor...@dc09.mydomain.org:39578] - 
[akka.tcp://sparkexecu...@dc09.mydomain.org:33886]: Error [Association 
failed with [akka.tcp://sparkexecu...@dc09.mydomain.org:33886]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkexecu...@dc09.mydomain.org:33886]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: dc09.mydomain.org/10.0.1.9:33886]
14/07/14 18:11:38 INFO worker.Worker: Asked to launch executor 
app-20140714180032-0010/32 for Spark shell
14/07/14 18:11:38 WARN

Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-19 Thread Makoto Yui

Xiangrui and Debasish,

(2014/06/18 6:33), Debasish Das wrote:

I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
got 100 iterations of SGD running in 200 seconds...10 executors each
with 16 GB memory...


I could figure out what the problem is. spark.akka.frameSize was too 
large. By setting spark.akka.frameSize=10, it worked for the news20 dataset.


The execution was slow for more large KDD cup 2012, Track 2 dataset 
(235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to 
the sequential aggregation of dense vectors on a single driver node.


It took about 7.6m for aggregation for an iteration.

Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-19 Thread Makoto Yui

Xiangrui,

(2014/06/19 23:43), Xiangrui Meng wrote:

It is because the frame size is not set correctly in executor backend. see 
spark-1112 . We are going to fix it in v1.0.1 . Did you try the treeAggregate?


Not yet. I will wait the v1.0.1 release.

Thanks,
Makoto


news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui
Hello,

I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
the number of training examples used in the evaluation is just 1,000.

It works fine for the dataset *news20.binary.1000* that has 178,560
features. However, it does not work for *news20.random.1000* where # of
features is large  (1,354,731 features) though we used a sparse vector
through MLUtils.loadLibSVMFile().

The execution seems not progressing while no error is reported in the
spark-shell as well as in the stdout/stderr of executors.

We used 32 executors with each allocating 7GB (2GB is for RDD) for
working memory.

Any suggesions? Your help is really appreciated.

==
Executed code
==
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

//val training = MLUtils.loadLibSVMFile(sc,
hdfs://host:8020/dataset/news20-binary/news20.binary.1000,
multiclass=false)
val training = MLUtils.loadLibSVMFile(sc,
hdfs://host:8020/dataset/news20-binary/news20.random.1000,
multiclass=false)

val numFeatures = training .take(1)(0).features.size
//numFeatures: Int = 178560 for news20.binary.1000
//numFeatures: Int = 1354731 for news20.random.1000
val model = LogisticRegressionWithSGD.train(training, numIterations=1)

==
The dataset used in the evaluation
==

http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

$ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' 
news20.binary.1000
$ sort -R news20.binary  news20.random
$ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' 
news20.random.1000

You can find the dataset in
https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui
Here is follow-up to the previous evaluation.

aggregate at GradientDescent.scala:178 never finishes at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178

We confirmed, by -verbose:gc, that GC is not happening during the aggregate
and the cumulative CPU time for the task is increasing little by little.

LBFGS also does not work for large # of features (news20.random.1000)
though it works fine for small # of features (news20.binary.1000).

aggregate at LBFGS.scala:201 also never finishes at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

---
[Evaluated code for LBFGS]

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.optimization._

val data = MLUtils.loadLibSVMFile(sc,
hdfs://dm01:8020/dataset/news20-binary/news20.random.1000,
multiclass=false)
val numFeatures = data.take(1)(0).features.size

val training = data.map(x = (x.label, MLUtils.appendBias(x.features))).cache()

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new
Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)
---


Thanks,
Makoto

2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com:
 Hello,

 I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
 Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
 the number of training examples used in the evaluation is just 1,000.

 It works fine for the dataset *news20.binary.1000* that has 178,560
 features. However, it does not work for *news20.random.1000* where # of
 features is large  (1,354,731 features) though we used a sparse vector
 through MLUtils.loadLibSVMFile().

 The execution seems not progressing while no error is reported in the
 spark-shell as well as in the stdout/stderr of executors.

 We used 32 executors with each allocating 7GB (2GB is for RDD) for
 working memory.

 Any suggesions? Your help is really appreciated.

 ==
 Executed code
 ==
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

 //val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.binary.1000,
 multiclass=false)
 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false)

 val numFeatures = training .take(1)(0).features.size
 //numFeatures: Int = 178560 for news20.binary.1000
 //numFeatures: Int = 1354731 for news20.random.1000
 val model = LogisticRegressionWithSGD.train(training, numIterations=1)

 ==
 The dataset used in the evaluation
 ==

 http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

 $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.binary.1000
 $ sort -R news20.binary  news20.random
 $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.random.1000

 You can find the dataset in
 https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 4:58), Xiangrui Meng wrote:

How many partitions did you set? If there are too many partitions,
please do a coalesce before calling ML algorithms.


The training data news20.random.1000 is small and thus only 2 
partitions are used by the default.


val training = MLUtils.loadLibSVMFile(sc, 
hdfs://host:8020/dataset/news20-binary/news20.random.1000, 
multiclass=false).


We also tried 32 partitions as follows but the aggregate never finishes.

val training = MLUtils.loadLibSVMFile(sc, 
hdfs://host:8020/dataset/news20-binary/news20.random.1000, 
multiclass=false, numFeatures = 1354731 , minPartitions = 32)



Btw, could you try the tree branch in my repo?
https://github.com/mengxr/spark/tree/tree

I used tree aggregate in this branch. It should help with the scalability.


Is treeAggregate itself available on Spark 1.0?

I wonder.. Could I test your modification just by running the following 
code on REPL?


---
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
  seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, 
features)) =
val l = gradient.compute(features, label, weights, 
Vectors.fromBreeze(grad))

(grad, loss + l)
  },
  combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), 
(grad2, loss2)) =

(grad1 += grad2, loss1 + loss2)
  }, 2)
-

Rebuilding Spark is quite something to do evaluation.

Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 6:03), Xiangrui Meng wrote:

Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
the web UI and check the driver's memory?


I am using Spark 1.0.

588.8 MB is allocated for driver RDDs.
I am setting SPARK_DRIVER_MEMORY=2g in the conf/spark-env.sh.

The value allocated for driver RDDs in the web UI was not changed by 
doing as follows:

$ SPARK_DRIVER_MEMORY=6g bin/spark-shell

I set -verbose:gc but full GC (or continuous GCs) does not happen 
during the aggregate at the driver.


Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 8:49), Xiangrui Meng wrote:

Makoto, dense vectors are used to in aggregation. If you have 32
partitions and each one sending a dense vector of size 1,354,731 to
master. Then the driver needs 300M+. That may be the problem.


It seems that it could cuase certain problems for a convex optimization 
of large training data and a merging tree, like allreduce, would help to 
reduce memory requirements (though time for aggregation might increase).



Which deploy mode are you using, standalone or local?


Standalone.

Setting -driver-memory 8G was not solved the aggregate problem.
Aggregation never finishes.

`ps aux | grep spark` on master is as follows:

myui  7049 79.3  1.1 8768868 592348 pts/2  Sl+  11:10   0:14 
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -Djava.library.path= -Xms2g -Xmx2g 
org.apache.spark.deploy.SparkSubmit spark-shell --driver-memory 8G 
--class org.apache.spark.repl.Main


myui  5694  2.5  0.5 6868296 292572 pts/2  Sl   10:59   0:17 
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m 
-Xmx512m org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 
--webui-port 8081



Exporting SPARK_DAEMON_MEMORY=4g in spark-env.sh did not take effect for 
the evaluation.


`ps aux | grep spark`
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms4g -Xmx4g 
org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 
--webui-port 8081

...


Thanks,
Makoto