Re: Spark KMeans hangs at reduceByKey / collectAsMap
Hi Xiangrui, I am using yarn-cluster mode. The current hadoop cluster is configured to only accept yarn-cluster mode and not allow yarn-client mode. I have no prevelige to change that. Without initializing with k-means||, the job finished in 10 minutes. With k-means, it just hangs there for almost 1 hour. I guess I can only go with random initialization in KMeans. Thanks again for your help. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16530.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark KMeans hangs at reduceByKey / collectAsMap
Hi guys, I am new to Spark. When I run Spark Kmeans (org.apache.spark.mllib.clustering.KMeans) on a small dataset, it works great. However, when using a large dataset with 1.5 million vectors, it just hangs there at some reducyByKey/collectAsMap stages (attached image shows the corresponding UI). http://apache-spark-user-list.1001560.n3.nabble.com/file/n16413/spark.png In the log file, I can see the errors below: 14/10/14 13:04:30 ERROR ConnectionManager: Corresponding SendingConnectionManagerId not found 14/10/14 13:04:30 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(server_name_here,32936) 14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(server_name_here,32936) 14/10/14 13:04:30 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@4aeed0e6 java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at java.nio.channels.SelectionKey.isConnectable(SelectionKey.java:336) at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:352) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:116) 14/10/14 13:04:30 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(server_name_here,32936) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:397) at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/10/14 13:04:30 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@2d584a4e 14/10/14 13:04:30 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(server_name_here,37767) 14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(server_name_here,37767) 14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(server_name_here,37767) 14/10/14 13:04:30 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@2d584a4e java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:116) 14/10/14 13:04:30 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(server_name_here,32936) 14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(server_name_here,32936) 14/10/14 13:04:30 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(server_name_here,32936) 14/10/14 13:04:30 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@server_name_here:44765] - [akka.tcp://spark@server_name_here:46406] disassociated! Shutting down. Regarding the above errors, I searched online and tried increasing the following confs, but still did not work. spark.worker.timeout=3 spark.akka.timeout=3 spark.akka.retry.wait=3 spark.akka.frameSize=1 spark.storage.blockManagerHeartBeatMs=3 --driver-memory 2g --executor-memory 2g --num-executors 100 I am running spark-submit on YARN. The Spark version is 1.1.0, and Hadoop is 2.4.1. Could you please some comments/insights? Thanks a lot. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
Hi guys, An interesting thing, for the input dataset which has 1.5 million vectors, if set the KMeans's k_value = 100 or k_value = 50, it hangs as mentioned above. However, if decrease k_value = 10, the same error still appears in the log but the application finished successfully, without observable hanging. Hopefully this provides more information. Thanks. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16417.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
What is the feature dimension? I saw you used 100 partitions. How many cores does your cluster have? -Xiangrui On Tue, Oct 14, 2014 at 1:51 PM, Ray ray-w...@outlook.com wrote: Hi guys, An interesting thing, for the input dataset which has 1.5 million vectors, if set the KMeans's k_value = 100 or k_value = 50, it hangs as mentioned above. However, if decrease k_value = 10, the same error still appears in the log but the application finished successfully, without observable hanging. Hopefully this provides more information. Thanks. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16417.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
Hi Xiangrui, The input dataset has 1.5 million sparse vectors. Each sparse vector has a dimension(cardinality) of 9153 and has less than 15 nonzero elements. Yes, if I set num-executors = 200, from the hadoop cluster scheduler, I can see the application got 201 vCores. From the spark UI, I can see it got 201 executors (as shown below). http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_core.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_executor.png Thanks. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
Hi Ray, The reduceByKey / collectAsMap does a lot of calculations. Therefore it can take a very long time if: 1) The parameter number of runs is set very high 2) k is set high (you have observed this already) 3) data is not properly repartitioned It seems that it is hanging, but there is a lot of calculation going on. Did you use a different value for the number of runs? If you look at the storage tab, does the data look balanced among executors? Best, Burak - Original Message - From: Ray ray-w...@outlook.com To: u...@spark.incubator.apache.org Sent: Tuesday, October 14, 2014 2:58:03 PM Subject: Re: Spark KMeans hangs at reduceByKey / collectAsMap Hi Xiangrui, The input dataset has 1.5 million sparse vectors. Each sparse vector has a dimension(cardinality) of 9153 and has less than 15 nonzero elements. Yes, if I set num-executors = 200, from the hadoop cluster scheduler, I can see the application got 201 vCores. From the spark UI, I can see it got 201 executors (as shown below). http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_core.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_executor.png Thanks. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
Hi Burak, In Kmeans, I used k_value = 100, num_iteration = 2, and num_run = 1. In the current test, I increase num-executors = 200. In the storage info 2 (as shown below), 11 executors are used (I think the data is kind of balanced) and others have zero memory usage. http://apache-spark-user-list.1001560.n3.nabble.com/file/n16438/spark_storage.png Currently, there is no active stage running, just as the first image I posted in the first place. You mentioned It seems that it is hanging, but there is a lot of calculation going on. I thought if some calculation is going on, there would be an active stage with an incomplete progress bar in the UI. Am I wrong? Thanks, Burak! Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16438.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
I saw similar bottleneck in reduceByKey operation. Maybe we can implement treeReduceByKey to reduce the pressure on single executor reducing the particular key. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 15, 2014 at 12:16 AM, Burak Yavuz bya...@stanford.edu wrote: Hi Ray, The reduceByKey / collectAsMap does a lot of calculations. Therefore it can take a very long time if: 1) The parameter number of runs is set very high 2) k is set high (you have observed this already) 3) data is not properly repartitioned It seems that it is hanging, but there is a lot of calculation going on. Did you use a different value for the number of runs? If you look at the storage tab, does the data look balanced among executors? Best, Burak - Original Message - From: Ray ray-w...@outlook.com To: u...@spark.incubator.apache.org Sent: Tuesday, October 14, 2014 2:58:03 PM Subject: Re: Spark KMeans hangs at reduceByKey / collectAsMap Hi Xiangrui, The input dataset has 1.5 million sparse vectors. Each sparse vector has a dimension(cardinality) of 9153 and has less than 15 nonzero elements. Yes, if I set num-executors = 200, from the hadoop cluster scheduler, I can see the application got 201 vCores. From the spark UI, I can see it got 201 executors (as shown below). http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_core.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_executor.png Thanks. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
Just ran a test on mnist8m (8m x 784) with k = 100 and numIter = 50. It worked fine. Ray, the error log you posted is after cluster termination, which is not the root cause. Could you search your log and find the real cause? On the executor tab screenshot, I saw only 200MB is used. Did you cache the input data? If yes, could you check the storage tab of Spark WebUI and see how the data is distributed across executors. -Xiangrui On Tue, Oct 14, 2014 at 4:26 PM, DB Tsai dbt...@dbtsai.com wrote: I saw similar bottleneck in reduceByKey operation. Maybe we can implement treeReduceByKey to reduce the pressure on single executor reducing the particular key. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 15, 2014 at 12:16 AM, Burak Yavuz bya...@stanford.edu wrote: Hi Ray, The reduceByKey / collectAsMap does a lot of calculations. Therefore it can take a very long time if: 1) The parameter number of runs is set very high 2) k is set high (you have observed this already) 3) data is not properly repartitioned It seems that it is hanging, but there is a lot of calculation going on. Did you use a different value for the number of runs? If you look at the storage tab, does the data look balanced among executors? Best, Burak - Original Message - From: Ray ray-w...@outlook.com To: u...@spark.incubator.apache.org Sent: Tuesday, October 14, 2014 2:58:03 PM Subject: Re: Spark KMeans hangs at reduceByKey / collectAsMap Hi Xiangrui, The input dataset has 1.5 million sparse vectors. Each sparse vector has a dimension(cardinality) of 9153 and has less than 15 nonzero elements. Yes, if I set num-executors = 200, from the hadoop cluster scheduler, I can see the application got 201 vCores. From the spark UI, I can see it got 201 executors (as shown below). http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_core.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n16428/spark_executor.png Thanks. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16428.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
Hi Xiangrui, Thanks for the guidance. I read the log carefully and found the root cause. KMeans, by default, uses KMeans++ as the initialization mode. According to the log file, the 70-minute hanging is actually the computing time of Kmeans++, as pasted below: 14/10/14 14:48:18 INFO DAGScheduler: Stage 20 (collectAsMap at KMeans.scala:293) finished in 2.233 s 14/10/14 14:48:18 INFO SparkContext: Job finished: collectAsMap at KMeans.scala:293, took 85.590020124 s 14/10/14 14:48:18 INFO ShuffleBlockManager: Could not find files for shuffle 5 for deleting 14/10/14 *14:48:18* INFO ContextCleaner: Cleaned shuffle 5 14/10/14 15:50:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/10/14 15:50:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS *14/10/14 15:54:36 INFO LocalKMeans: Local KMeans++ converged in 11 iterations. 14/10/14 15:54:36 INFO KMeans: Initialization with k-means|| took 4426.913 seconds.* 14/10/14 15:54:37 INFO SparkContext: Starting job: collectAsMap at KMeans.scala:190 14/10/14 15:54:37 INFO DAGScheduler: Registering RDD 38 (reduceByKey at KMeans.scala:190) 14/10/14 15:54:37 INFO DAGScheduler: Got job 16 (collectAsMap at KMeans.scala:190) with 100 output partitions (allowLocal=false) 14/10/14 15:54:37 INFO DAGScheduler: Final stage: Stage 22(collectAsMap at KMeans.scala:190) I now use random as the Kmeans initialization mode, and other confs remain the same. This time, it just finished quickly~~ In your test on mnis8m, did you use KMeans++ as initialization mode? How long it takes? Thanks again for your help. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark KMeans hangs at reduceByKey / collectAsMap
I used k-means||, which is the default. And it took less than 1 minute to finish. 50 iterations took less than 25 minutes on a cluster of 9 m3.2xlarge EC2 nodes. Which deploy mode did you use? Is it yarn-client? -Xiangrui On Tue, Oct 14, 2014 at 6:03 PM, Ray ray-w...@outlook.com wrote: Hi Xiangrui, Thanks for the guidance. I read the log carefully and found the root cause. KMeans, by default, uses KMeans++ as the initialization mode. According to the log file, the 70-minute hanging is actually the computing time of Kmeans++, as pasted below: 14/10/14 14:48:18 INFO DAGScheduler: Stage 20 (collectAsMap at KMeans.scala:293) finished in 2.233 s 14/10/14 14:48:18 INFO SparkContext: Job finished: collectAsMap at KMeans.scala:293, took 85.590020124 s 14/10/14 14:48:18 INFO ShuffleBlockManager: Could not find files for shuffle 5 for deleting 14/10/14 *14:48:18* INFO ContextCleaner: Cleaned shuffle 5 14/10/14 15:50:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/10/14 15:50:41 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS *14/10/14 15:54:36 INFO LocalKMeans: Local KMeans++ converged in 11 iterations. 14/10/14 15:54:36 INFO KMeans: Initialization with k-means|| took 4426.913 seconds.* 14/10/14 15:54:37 INFO SparkContext: Starting job: collectAsMap at KMeans.scala:190 14/10/14 15:54:37 INFO DAGScheduler: Registering RDD 38 (reduceByKey at KMeans.scala:190) 14/10/14 15:54:37 INFO DAGScheduler: Got job 16 (collectAsMap at KMeans.scala:190) with 100 output partitions (allowLocal=false) 14/10/14 15:54:37 INFO DAGScheduler: Final stage: Stage 22(collectAsMap at KMeans.scala:190) I now use random as the Kmeans initialization mode, and other confs remain the same. This time, it just finished quickly~~ In your test on mnis8m, did you use KMeans++ as initialization mode? How long it takes? Thanks again for your help. Ray -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-KMeans-hangs-at-reduceByKey-collectAsMap-tp16413p16450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org