Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-15 Thread Ray
Hi Xiangrui,

I am using yarn-cluster mode. The current hadoop cluster is configured to
only accept yarn-cluster mode and not allow yarn-client mode. I have no
prevelige to change that.

Without initializing with k-means||, the job finished in 10 minutes. With
k-means, it just hangs there for almost 1 hour.

I guess I can only go with random initialization in KMeans.

Thanks again for your help.

Ray




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16530.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Ray
Hi guys,

I am new to Spark. When I run Spark Kmeans
(org.apache.spark.mllib.clustering.KMeans) on a small dataset, it works
great. However, when using a large dataset with 1.5 million vectors, it just
hangs there at some reducyByKey/collectAsMap stages (attached image shows
the corresponding UI).

http://apache-spark-user-list.1001560.n3.nabble.com/file/n16413/spark.png 



In the log file, I can see the errors below:

14/10/14 13:04:30 ERROR ConnectionManager: Corresponding
SendingConnectionManagerId not found
14/10/14 13:04:30 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(server_name_here,32936)
14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(server_name_here,32936)
14/10/14 13:04:30 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@4aeed0e6
java.nio.channels.CancelledKeyException
at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
at
java.nio.channels.SelectionKey.isConnectable(SelectionKey.java:336)
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:352)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:116)
14/10/14 13:04:30 ERROR SendingConnection: Exception while reading
SendingConnection to ConnectionManagerId(server_name_here,32936)
java.nio.channels.ClosedChannelException
at
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
at
org.apache.spark.network.SendingConnection.read(Connection.scala:397)
at
org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:176)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/10/14 13:04:30 INFO ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@2d584a4e
14/10/14 13:04:30 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(server_name_here,37767)
14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(server_name_here,37767)
14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(server_name_here,37767)
14/10/14 13:04:30 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@2d584a4e
java.nio.channels.CancelledKeyException
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:116)
14/10/14 13:04:30 INFO ConnectionManager: Handling connection error on
connection to ConnectionManagerId(server_name_here,32936)
14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(server_name_here,32936)
14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(server_name_here,32936)
14/10/14 13:04:30 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@server_name_here:44765] -
[akka.tcp://spark@server_name_here:46406] disassociated! Shutting down.




Regarding the above errors, I searched online and tried increasing the
following confs, but still did not work.

spark.worker.timeout=3 

spark.akka.timeout=3 
spark.akka.retry.wait=3 
spark.akka.frameSize=1

spark.storage.blockManagerHeartBeatMs=3  

--driver-memory 2g
--executor-memory 2g
--num-executors 100



I am running spark-submit on YARN. The Spark version is 1.1.0,  and Hadoop
is 2.4.1.

Could you please some comments/insights?

Thanks a lot.

Ray




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Ray
Hi guys,

An interesting thing, for the input dataset which has 1.5 million vectors,
if set the KMeans's k_value = 100 or k_value = 50, it hangs as mentioned
above. However, if decrease k_value  = 10, the same error still appears in
the log but the application finished successfully, without observable
hanging.

Hopefully this provides more information.

Thanks.

Ray



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16417.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Xiangrui Meng
What is the feature dimension? I saw you used 100 partitions. How many
cores does your cluster have? -Xiangrui

On Tue, Oct 14, 2014 at 1:51 PM, Ray ray-w...@outlook.com wrote:
 Hi guys,

 An interesting thing, for the input dataset which has 1.5 million vectors,
 if set the KMeans's k_value = 100 or k_value = 50, it hangs as mentioned
 above. However, if decrease k_value  = 10, the same error still appears in
 the log but the application finished successfully, without observable
 hanging.

 Hopefully this provides more information.

 Thanks.

 Ray



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16417.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Ray
Hi Xiangrui,

The input dataset has 1.5 million sparse vectors. Each sparse vector has a
dimension(cardinality) of 9153 and has less than 15 nonzero elements.


Yes, if I set num-executors = 200, from the hadoop cluster scheduler, I can
see the application got  201 vCores. From the spark UI, I can see it got 201
executors (as shown below).

http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_core.png
  

http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_executor.png
 



Thanks.

Ray




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16428.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Burak Yavuz
Hi Ray,

The reduceByKey / collectAsMap does a lot of calculations. Therefore it can 
take a very long time if:
1) The parameter number of runs is set very high
2) k is set high (you have observed this already)
3) data is not properly repartitioned
It seems that it is hanging, but there is a lot of calculation going on.

Did you use a different value for the number of runs?
If you look at the storage tab, does the data look balanced among executors?

Best,
Burak

- Original Message -
From: Ray ray-w...@outlook.com
To: u...@spark.incubator.apache.org
Sent: Tuesday, October 14, 2014 2:58:03 PM
Subject: Re: Spark KMeans hangs at reduceByKey / collectAsMap

Hi Xiangrui,

The input dataset has 1.5 million sparse vectors. Each sparse vector has a
dimension(cardinality) of 9153 and has less than 15 nonzero elements.


Yes, if I set num-executors = 200, from the hadoop cluster scheduler, I can
see the application got  201 vCores. From the spark UI, I can see it got 201
executors (as shown below).

http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_core.png
  

http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_executor.png
 



Thanks.

Ray




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16428.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Ray
Hi Burak,

In Kmeans, I used k_value = 100, num_iteration = 2, and num_run = 1.

In the current test, I increase num-executors = 200. In the storage info 2
(as shown below), 11 executors are  used (I think the data is kind of
balanced) and others have zero memory usage. 
http://apache-spark-user-list.1001560.n3.nabble.com/file/n16438/spark_storage.png
 


Currently, there is no active stage running, just as the first image I
posted in the first place. You mentioned It seems that it is hanging, but
there is a lot of calculation going on. I thought if some calculation is
going on, there would be an active stage with an incomplete progress bar in
the UI. Am I wrong? 


Thanks, Burak!

Ray



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16438.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread DB Tsai
I saw similar bottleneck in reduceByKey operation. Maybe we can
implement treeReduceByKey to reduce the pressure on single executor
reducing the particular key.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Oct 15, 2014 at 12:16 AM, Burak Yavuz bya...@stanford.edu wrote:
 Hi Ray,

 The reduceByKey / collectAsMap does a lot of calculations. Therefore it can 
 take a very long time if:
 1) The parameter number of runs is set very high
 2) k is set high (you have observed this already)
 3) data is not properly repartitioned
 It seems that it is hanging, but there is a lot of calculation going on.

 Did you use a different value for the number of runs?
 If you look at the storage tab, does the data look balanced among executors?

 Best,
 Burak

 - Original Message -
 From: Ray ray-w...@outlook.com
 To: u...@spark.incubator.apache.org
 Sent: Tuesday, October 14, 2014 2:58:03 PM
 Subject: Re: Spark KMeans hangs at reduceByKey / collectAsMap

 Hi Xiangrui,

 The input dataset has 1.5 million sparse vectors. Each sparse vector has a
 dimension(cardinality) of 9153 and has less than 15 nonzero elements.


 Yes, if I set num-executors = 200, from the hadoop cluster scheduler, I can
 see the application got  201 vCores. From the spark UI, I can see it got 201
 executors (as shown below).

 http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_core.png

 http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_executor.png



 Thanks.

 Ray




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16428.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Xiangrui Meng
Just ran a test on mnist8m (8m x 784) with k = 100 and numIter = 50.
It worked fine. Ray, the error log you posted is after cluster
termination, which is not the root cause. Could you search your log
and find the real cause? On the executor tab screenshot, I saw only
200MB is used. Did you cache the input data? If yes, could you check
the storage tab of Spark WebUI and see how the data is distributed
across executors. -Xiangrui

On Tue, Oct 14, 2014 at 4:26 PM, DB Tsai dbt...@dbtsai.com wrote:
 I saw similar bottleneck in reduceByKey operation. Maybe we can
 implement treeReduceByKey to reduce the pressure on single executor
 reducing the particular key.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Oct 15, 2014 at 12:16 AM, Burak Yavuz bya...@stanford.edu wrote:
 Hi Ray,

 The reduceByKey / collectAsMap does a lot of calculations. Therefore it can 
 take a very long time if:
 1) The parameter number of runs is set very high
 2) k is set high (you have observed this already)
 3) data is not properly repartitioned
 It seems that it is hanging, but there is a lot of calculation going on.

 Did you use a different value for the number of runs?
 If you look at the storage tab, does the data look balanced among executors?

 Best,
 Burak

 - Original Message -
 From: Ray ray-w...@outlook.com
 To: u...@spark.incubator.apache.org
 Sent: Tuesday, October 14, 2014 2:58:03 PM
 Subject: Re: Spark KMeans hangs at reduceByKey / collectAsMap

 Hi Xiangrui,

 The input dataset has 1.5 million sparse vectors. Each sparse vector has a
 dimension(cardinality) of 9153 and has less than 15 nonzero elements.


 Yes, if I set num-executors = 200, from the hadoop cluster scheduler, I can
 see the application got  201 vCores. From the spark UI, I can see it got 201
 executors (as shown below).

 http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_core.png

 http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_executor.png



 Thanks.

 Ray




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16428.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Ray
Hi Xiangrui,

Thanks for the guidance. I read the log carefully and found the root cause. 

KMeans, by default, uses KMeans++ as the initialization mode. According to
the log file, the 70-minute hanging is actually the computing time of
Kmeans++, as pasted below:

14/10/14 14:48:18 INFO DAGScheduler: Stage 20 (collectAsMap at
KMeans.scala:293) finished in 2.233 s
14/10/14 14:48:18 INFO SparkContext: Job finished: collectAsMap at
KMeans.scala:293, took 85.590020124 s
14/10/14 14:48:18 INFO ShuffleBlockManager: Could not find files for shuffle
5 for deleting
14/10/14 *14:48:18* INFO ContextCleaner: Cleaned shuffle 5
14/10/14 15:50:41 WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS
14/10/14 15:50:41 WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeRefBLAS
*14/10/14 15:54:36 INFO LocalKMeans: Local KMeans++ converged in 11
iterations.
14/10/14 15:54:36 INFO KMeans: Initialization with k-means|| took 4426.913
seconds.*
14/10/14 15:54:37 INFO SparkContext: Starting job: collectAsMap at
KMeans.scala:190
14/10/14 15:54:37 INFO DAGScheduler: Registering RDD 38 (reduceByKey at
KMeans.scala:190)
14/10/14 15:54:37 INFO DAGScheduler: Got job 16 (collectAsMap at
KMeans.scala:190) with 100 output partitions (allowLocal=false)
14/10/14 15:54:37 INFO DAGScheduler: Final stage: Stage 22(collectAsMap at
KMeans.scala:190)



I now use random as the Kmeans initialization mode, and other confs remain
the same. This time, it just finished quickly~~

In your test on mnis8m, did you use KMeans++ as initialization mode? How
long it takes?

Thanks again for your help.

Ray







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark KMeans hangs at reduceByKey / collectAsMap

2014-10-14 Thread Xiangrui Meng
I used k-means||, which is the default. And it took less than 1
minute to finish. 50 iterations took less than 25 minutes on a cluster
of 9 m3.2xlarge EC2 nodes. Which deploy mode did you use? Is it
yarn-client? -Xiangrui

On Tue, Oct 14, 2014 at 6:03 PM, Ray ray-w...@outlook.com wrote:
 Hi Xiangrui,

 Thanks for the guidance. I read the log carefully and found the root cause.

 KMeans, by default, uses KMeans++ as the initialization mode. According to
 the log file, the 70-minute hanging is actually the computing time of
 Kmeans++, as pasted below:

 14/10/14 14:48:18 INFO DAGScheduler: Stage 20 (collectAsMap at
 KMeans.scala:293) finished in 2.233 s
 14/10/14 14:48:18 INFO SparkContext: Job finished: collectAsMap at
 KMeans.scala:293, took 85.590020124 s
 14/10/14 14:48:18 INFO ShuffleBlockManager: Could not find files for shuffle
 5 for deleting
 14/10/14 *14:48:18* INFO ContextCleaner: Cleaned shuffle 5
 14/10/14 15:50:41 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS
 14/10/14 15:50:41 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS
 *14/10/14 15:54:36 INFO LocalKMeans: Local KMeans++ converged in 11
 iterations.
 14/10/14 15:54:36 INFO KMeans: Initialization with k-means|| took 4426.913
 seconds.*
 14/10/14 15:54:37 INFO SparkContext: Starting job: collectAsMap at
 KMeans.scala:190
 14/10/14 15:54:37 INFO DAGScheduler: Registering RDD 38 (reduceByKey at
 KMeans.scala:190)
 14/10/14 15:54:37 INFO DAGScheduler: Got job 16 (collectAsMap at
 KMeans.scala:190) with 100 output partitions (allowLocal=false)
 14/10/14 15:54:37 INFO DAGScheduler: Final stage: Stage 22(collectAsMap at
 KMeans.scala:190)



 I now use random as the Kmeans initialization mode, and other confs remain
 the same. This time, it just finished quickly~~

 In your test on mnis8m, did you use KMeans++ as initialization mode? How
 long it takes?

 Thanks again for your help.

 Ray







 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16450.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org