Re: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs
So I'm not sure I completely follow. Are you asking for a way to change the limit without having to do the repartition? And your DL software doesn't care if you got say 30 executors instead of 20? Normally I would expect the number fo partitions at that point to be 200 (or whatever you set for your shuffle partitions) unless you are using AQE coalescing partitions functionality and then it could change. Are you using the latter? > Normally I try to aim for anything between 30s-5m per task (failure-wise), > depending on the cluster, its stability, etc. But in this case, individual > tasks can take 30-60 minutes, if not much more. Any failure during this long > time is pretty expensive. Are you saying when you manually do the repartition your DL tasks take 30-60 minutes? so again you want like AQE coalesce partitions to kick in to attempt to pick partition sizes for your? Tom On Thursday, November 3, 2022 at 03:18:07 PM CDT, Shay Elbaz wrote: #yiv4404278030 P {margin-top:0;margin-bottom:0;}This is exactly what we ended up doing! The only drawback I saw with this approach is that the GPU tasks get pretty big (in terms of data and compute time), and task failures become expansive. That's why I reached out to the mailing list in the first place 🙂 Normally I try to aim for anything between 30s-5m per task (failure-wise), depending on the cluster, its stability, etc. But in this case, individual tasks can take 30-60 minutes, if not much more. Any failure during this long time is pretty expensive. ShayFrom: Tom Graves Sent: Thursday, November 3, 2022 7:56 PM To: Artemis User ; user@spark.apache.org ; Shay Elbaz Subject: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs | ATTENTION: This email originated from outside of GM. | Stage level scheduling does not allow you to change configs right now. This is something we thought about as follow on but have never implemented. How many tasks on the DL stage are you running? The typical case is run some etl lots of tasks... do mapPartitions and then run your DL stuff, before that mapPartitions you could do a repartition if necessary to get to exactly the number of tasks you want (20). That way even if maxExecutors=500 you will only ever need 20 or whatever you repartition to and spark isn't going to ask for more then that. Tom On Thursday, November 3, 2022 at 11:10:31 AM CDT, Shay Elbaz wrote: Thanks again Artemis, I really appreciate it. I have watched the video but did not find an answer. Please bear with me just one more iteration 🙂 Maybe I'll be more specific:Suppose I start the application with maxExecutors=500, executors.cores=2, because that's the amount of resources needed for the ETL part. But for the DL part I only need 20 GPUs. SLS API only allows to set the resources per executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I configure the profile with 1 GPU per executor. So, the question is how do I limit the stage resources to 20 GPUs total? Thanks again,Shay From: Artemis User Sent: Thursday, November 3, 2022 5:23 PM To: user@spark.apache.org Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of executors when using GPUs | ATTENTION: This email originated from outside of GM. | Shay, You may find this video helpful (with some API code samples that you are looking for). https://www.youtube.com/watch?v=JNQu-226wUc&t=171s. The issue here isn't how to limit the number of executors but to request for the right GPU-enabled executors dynamically. Those executors used in pre-GPU stages should be returned back to resource managers with dynamic resource allocation enabled (and with the right DRA policies). Hope this helps.. Unfortunately there isn't a lot of detailed docs for this topic since GPU acceleration is kind of new in Spark (not straightforward like in TF). I wish the Spark doc team could provide more details in the next release... On 11/3/22 2:37 AM, Shay Elbaz wrote: Thanks Artemis. We are not using Rapids, but rather using GPUs through the Stage Level Scheduling feature with ResourceProfile. In Kubernetes you have to turn on shuffle tracking for dynamic allocation, anyhow.The question is how we can limit thenumber of executors when building a new ResourceProfile, directly (API) or indirectly (some advanced workaround). Thanks,Shay From: Artemis User Sent: Thursday, November 3, 2022 1:16 AM To: user@spark.apache.org Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors when using GPUs | ATTENTION: This email originated from outside of GM. | Are you using Rapids for GPU support in Spark? Couple of options you may want to try: - In addition to dynamic allocation turned on, you may also need to turn on external shuffling service. - Sounds like you are using Kubernet
Re: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of executors when using GPUs
Stage level scheduling does not allow you to change configs right now. This is something we thought about as follow on but have never implemented. How many tasks on the DL stage are you running? The typical case is run some etl lots of tasks... do mapPartitions and then run your DL stuff, before that mapPartitions you could do a repartition if necessary to get to exactly the number of tasks you want (20). That way even if maxExecutors=500 you will only ever need 20 or whatever you repartition to and spark isn't going to ask for more then that. Tom On Thursday, November 3, 2022 at 11:10:31 AM CDT, Shay Elbaz wrote: #yiv8086956851 P {margin-top:0;margin-bottom:0;}Thanks again Artemis, I really appreciate it. I have watched the video but did not find an answer. Please bear with me just one more iteration 🙂 Maybe I'll be more specific:Suppose I start the application with maxExecutors=500, executors.cores=2, because that's the amount of resources needed for the ETL part. But for the DL part I only need 20 GPUs. SLS API only allows to set the resources per executor/task, so Spark would (try to) allocate up to 500 GPUs, assuming I configure the profile with 1 GPU per executor. So, the question is how do I limit the stage resources to 20 GPUs total? Thanks again,Shay From: Artemis User Sent: Thursday, November 3, 2022 5:23 PM To: user@spark.apache.org Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of executors when using GPUs | ATTENTION: This email originated from outside of GM. | Shay, You may find this video helpful (with some API code samples that you are looking for). https://www.youtube.com/watch?v=JNQu-226wUc&t=171s. The issue here isn't how to limit the number of executors but to request for the right GPU-enabled executors dynamically. Those executors used in pre-GPU stages should be returned back to resource managers with dynamic resource allocation enabled (and with the right DRA policies). Hope this helps.. Unfortunately there isn't a lot of detailed docs for this topic since GPU acceleration is kind of new in Spark (not straightforward like in TF). I wish the Spark doc team could provide more details in the next release... On 11/3/22 2:37 AM, Shay Elbaz wrote: #yiv8086956851 #yiv8086956851 --p {margin-top:0;margin-bottom:0;}#yiv8086956851 Thanks Artemis. We are not using Rapids, but rather using GPUs through the Stage Level Scheduling feature with ResourceProfile. In Kubernetes you have to turn on shuffle tracking for dynamic allocation, anyhow.The question is how we can limit thenumber of executors when building a new ResourceProfile, directly (API) or indirectly (some advanced workaround). Thanks,Shay From: Artemis User Sent: Thursday, November 3, 2022 1:16 AM To: user@spark.apache.org Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors when using GPUs | ATTENTION: This email originated from outside of GM. | Are you using Rapids for GPU support in Spark? Couple of options you may want to try: - In addition to dynamic allocation turned on, you may also need to turn on external shuffling service. - Sounds like you are using Kubernetes. In that case, you may also need to turn on shuffle tracking. - The "stages" are controlled by the APIs. The APIs for dynamic resource request (change of stage) do exist, but only for RDDs (e.g. TaskResourceRequest and ExecutorResourceRequest). On 11/2/22 11:30 AM, Shay Elbaz wrote: #yiv8086956851 #yiv8086956851 --p {margin-top:0;margin-bottom:0;}#yiv8086956851 Hi, Our typical applications need lessexecutors for a GPU stage than for a CPU stage. We are using dynamic allocation with stage level scheduling, and Spark tries to maximize the number of executors also during the GPU stage, causing a bit of resources chaos in the cluster. This forces us to use a lower value for 'maxExecutors' in the first place, at the cost of the CPU stages performance. Or try to solve this in the Kubernets scheduler level, which is not straightforward and doesn't feel like the right way to go. Is there a way to effectively use less executors in Stage Level Scheduling? The API does not seem to include such an option, but maybe there is some more advanced workaround? Thanks,Shay
Re: [Spark Core, PySpark] Separate stage level scheduling for consecutive map functions
As Sean mentioned its only available at Stage level but you said you don't want to shuffle so splitting into stages doesn't help you. Without more details it seems like you could "hack" this by just requesting an executor with 1 GPU (allowing 2 tasks per gpu) and 2 CPUs and the one task would use the GPU and the other could just use the CPU. Perhaps that is to simplistic or brittle though. TomOn Saturday, July 31, 2021, 03:56:18 AM CDT, Andreas Kunft wrote: I have a setup with two work intensive tasks, one map using GPU followed by a map using only CPU. Using stage level resource scheduling, I request a GPU node, but would also like to execute the consecutive CPU map on a different executor so that the GPU node is not blocked. However, spark will always combine the two maps due to the narrow dependency, and thus, I can not define two different resource requirements. So the question is: can I force the two map functions on different executors without shuffling or even better is there a plan to enable this by assigning different resource requirements. Best
Re: GPU job in Spark 3
Hey Martin, I would encourage you to file issues in the spark-rapids repo for questions with that plugin: https://github.com/NVIDIA/spark-rapids/issues I'm assuming the query ran and you looked at the sql UI or the .expalin() output and it was on cpu and not gpu? I am assuming you have the cuda 11.0 runtime installed (look in /usr/local). You printed the driver version which is 11.2 but the runtimes can be different. You are using the 11.0 cuda version of the cudf library. If that didn't match runtime though it would have failed and not ran anything. The easiest way to tell why it didn't run on the GPU is to enable the config: spark.rapids.sql.explain=NOT_ON_GPU It will print out logs to your console as to why different operators don't run on the gpu. Again feel free to open up a question issues in the spark-rapids repo and we can discuss more there. Tom On Friday, April 9, 2021, 11:19:05 AM CDT, Martin Somers wrote: Hi Everyone !! Im trying to get on premise GPU instance of Spark 3 running on my ubuntu box, and I am following: https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-on-prem.html#example-join-operation Anyone with any insight into why a spark job isnt being ran on the GPU - appears to be all on the CPU, hadoop binary installed and appears to be functioning fine export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath) here is my setup on ubuntu20.10 ▶ nvidia-smi +-+ | NVIDIA-SMI 460.39 Driver Version: 460.39 CUDA Version: 11.2 | |---+--+--+ | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | | | | MIG M. | |===+==+==| | 0 GeForce RTX 3090 Off | :21:00.0 On | N/A | | 0% 38C P8 19W / 370W | 478MiB / 24265MiB | 0% Default | | | | N/A | +---+--+--+ /opt/sparkRapidsPlugin ▶ ls cudf-0.18.1-cuda11.jar getGpusResources.sh rapids-4-spark_2.12-0.4.1.jar ▶ scalac --version Scala compiler version 2.13.0 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc. ▶ spark-shell --version 2021-04-09 17:05:36,158 WARN util.Utils: Your hostname, studio resolves to a loopback address: 127.0.1.1; using 192.168.0.221 instead (on interface wlp71s0) 2021-04-09 17:05:36,159 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.1.1 /_/ Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.10 Branch HEAD Compiled by user ubuntu on 2021-02-22T01:04:02Z Revision 1d550c4e90275ab418b9161925049239227f3dc9 Url https://github.com/apache/spark Type --help for more information. here is how I calling spark prior to adding the test job $SPARK_HOME/bin/spark-shell \ --master local \ --num-executors 1 \ --conf spark.executor.cores=16 \ --conf spark.rapids.sql.concurrentGpuTasks=1 \ --driver-memory 10g \ --conf spark.executor.extraClassPath=${SPARK_CUDF_JAR}:${SPARK_RAPIDS_PLUGIN_JAR} --conf spark.rapids.memory.pinnedPool.size=16G \ --conf spark.locality.wait=0s \ --conf spark.sql.files.maxPartitionBytes=512m \ --conf spark.sql.shuffle.partitions=10 \ --conf spark.plugins=com.nvidia.spark.SQLPlugin \ --files $SPARK_RAPIDS_DIR/getGpusResources.sh \ --jars ${SPARK_CUDF_JAR},${SPARK_RAPIDS_PLUGIN_JAR} Test job is from the example join-operation val df = sc.makeRDD(1 to 1000, 6).toDF val df2 = sc.makeRDD(1 to 1000, 6).toDF df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === $"b").count I just noticed that the scala versions are out of sync - that shouldnt affect it? is there anything else I can try in the -
Re: Build customized resource manager
I don't know if it all works but some work was done to make cluster manager pluggable, see SPARK-13904. Tom On Wednesday, November 6, 2019, 07:22:59 PM CST, Klaus Ma wrote: Any suggestions? - Klaus On Mon, Nov 4, 2019 at 5:04 PM Klaus Ma wrote: Hi team, AFAIK, we built k8s/yarn/mesos as resource manager; but I'd like to did some enhancement to them, e.g. integrate with Volcano in k8s. Is that possible to do that without fork the whole spark project? For example, enable customized resource manager with configuration, e.g. replace `org.apache.spark.deploy.k8s.submit.KubernetesClientApplication` with `MyK8SClient`, so I can only maintain the resource manager instead of the whole project. -- Klaus
[ANNOUNCE] Apache Spark 2.2.2
We are happy to announce the availability of Spark 2.2.2! Apache Spark 2.2.2 is a maintenance release, based on the branch-2.2 maintenance branch of Spark. We strongly recommend all 2.2.x users to upgrade to this stable release. The release notes are available at http://spark.apache.org/releases/spark-release-2-2-2.html To download Apache Spark 2.2.2 visit http://spark.apache.org/downloads.html. This version of Spark is also available on Maven and PyPI. We would like to acknowledge all community members for contributing patches to this release.
Re: anyone using netlib-java with sparkR on yarn spark1.6?
Is there anything other then the spark assembly that needs to be in the classpath? I verified the assembly was built right and its in the classpath (else nothing would work). Thanks,Tom On Tuesday, November 10, 2015 8:29 PM, Shivaram Venkataraman wrote: I think this is happening in the driver. Could you check the classpath of the JVM that gets started ? If you use spark-submit on yarn the classpath is setup before R gets launched, so it should match the behavior of Scala / Python. Thanks Shivaram On Fri, Nov 6, 2015 at 1:39 PM, Tom Graves wrote: > I'm trying to use the netlib-java stuff with mllib and sparkR on yarn. I've > compiled with -Pnetlib-lgpl, see the necessary things in the spark assembly > jar. The nodes have /usr/lib64/liblapack.so.3, /usr/lib64/libblas.so.3, > and /usr/lib/libgfortran.so.3. > > > Running: > data <- read.df(sqlContext, 'data.csv', 'com.databricks.spark.csv') > mdl = glm(C2~., data, family="gaussian") > > But I get the error: > 15/11/06 21:17:27 WARN LAPACK: Failed to load implementation from: > com.github.fommil.netlib.NativeSystemLAPACK > 15/11/06 21:17:27 WARN LAPACK: Failed to load implementation from: > com.github.fommil.netlib.NativeRefLAPACK > 15/11/06 21:17:27 ERROR RBackendHandler: fitRModelFormula on > org.apache.spark.ml.api.r.SparkRWrappers failed > Error in invokeJava(isStatic = TRUE, className, methodName, ...) : > java.lang.AssertionError: assertion failed: lapack.dpotrs returned 18. > at scala.Predef$.assert(Predef.scala:179) > at > org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:40) > at > org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:114) > at > org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:166) > at > org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:65) > at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) > at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) > at > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:138) > at > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:134) > > Anyone have this working? > > Thanks, > Tom - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
anyone using netlib-java with sparkR on yarn spark1.6?
I'm trying to use the netlib-java stuff with mllib and sparkR on yarn. I've compiled with -Pnetlib-lgpl, see the necessary things in the spark assembly jar. The nodes have /usr/lib64/liblapack.so.3, /usr/lib64/libblas.so.3, and /usr/lib/libgfortran.so.3. Running:data <- read.df(sqlContext, 'data.csv', 'com.databricks.spark.csv') mdl = glm(C2~., data, family="gaussian") But I get the error:15/11/06 21:17:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK15/11/06 21:17:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK15/11/06 21:17:27 ERROR RBackendHandler: fitRModelFormula on org.apache.spark.ml.api.r.SparkRWrappers failedError in invokeJava(isStatic = TRUE, className, methodName, ...) : java.lang.AssertionError: assertion failed: lapack.dpotrs returned 18. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:40) at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:114) at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:166) at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:65) at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:138) at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:134) Anyone have this working? Thanks,Tom
Changing application log level in standalone cluster
I would like to change the logging level for my application running on a standalone Spark cluster. Is there an easy way to do that without changing the log4j.properties on each individual node? Thanks,Tom
Re: Failed RC-10 yarn-cluster job for FS closed error when cleaning up staging directory
It sounds like something is closing the hdfs filesystem before everyone is really done with it. The filesystem gets cached and is shared so if someone closes it while other threads are still using it you run into this error. Is your application closing the filesystem? Are you using the event logging feature? Could you share the options you are running with? Yarn will retry the application depending on how the Application Master attempt fails (this is a configurable setting as to how many times it retries). That is probably the second driver you are referring to. But they shouldn't have overlapped as far as both being up at the same time. Is that the case you are seeing? Generally you want to look at why the first application attempt fails. Tom On Wednesday, May 21, 2014 6:10 PM, Kevin Markey wrote: I tested an application on RC-10 and Hadoop 2.3.0 in yarn-cluster mode that had run successfully with Spark-0.9.1 and Hadoop 2.3 or 2.2. The application successfully ran to conclusion but it ultimately failed. There were 2 anomalies... 1. ASM reported only that the application was "ACCEPTED". It never indicated that the application was "RUNNING." 14/05/21 16:06:12 INFO yarn.Client: Application report from ASM: > application identifier: application_1400696988985_0007 > appId: 7 > clientToAMToken: null > appDiagnostics: > appMasterHost: N/A > appQueue: default > appMasterRpcPort: -1 > appStartTime: 1400709970857 > yarnAppState: ACCEPTED > distributedFinalState: UNDEFINED > appTrackingUrl: >http://Sleepycat:8088/proxy/application_1400696988985_0007/ > appUser: hduser > Furthermore, it started a second container, running two partly overlapping drivers, when it appeared that the application never started. Each container ran to conclusion as explained above, taking twice as long as usual for both to complete. Both instances had the same concluding failure. 2. Each instance failed as indicated by the stderr log, finding that the filesystem was closed when trying to clean up the staging directories. 14/05/21 16:08:24 INFO Executor: Serialized size of result for 1453 is 863 14/05/21 16:08:24 INFO Executor: Sending result for 1453 directly to driver 14/05/21 16:08:24 INFO Executor: Finished task ID 1453 14/05/21 16:08:24 INFO TaskSetManager: Finished TID 1453 in 202 ms on localhost (progress: 2/2) 14/05/21 16:08:24 INFO DAGScheduler: Completed ResultTask(1507, 1) 14/05/21 16:08:24 INFO TaskSchedulerImpl: Removed TaskSet 1507.0, whose tasks have all completed, from pool 14/05/21 16:08:24 INFO DAGScheduler: Stage 1507 (count at KEval.scala:32) finished in 0.417 s 14/05/21 16:08:24 INFO SparkContext: Job finished: count at KEval.scala:32, took 1.532789283 s 14/05/21 16:08:24 INFO SparkUI: Stopped Spark web UI at http://dhcp-brm-bl1-215-1e-east-10-135-123-92.usdhcp.oraclecorp.com:42250 14/05/21 16:08:24 INFO DAGScheduler: Stopping DAGScheduler 14/05/21 16:08:25 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/05/21 16:08:25 INFO ConnectionManager: Selector thread was interrupted! 14/05/21 16:08:25 INFO ConnectionManager: ConnectionManager stopped 14/05/21 16:08:25 INFO MemoryStore: MemoryStore cleared 14/05/21 16:08:25 INFO BlockManager: BlockManager stopped 14/05/21 16:08:25 INFO BlockManagerMasterActor: Stopping BlockManagerMaster 14/05/21 16:08:25 INFO BlockManagerMaster: BlockManagerMaster stopped 14/05/21 16:08:25 INFO SparkContext: Successfully stopped SparkContext 14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/05/21 16:08:25 INFO ApplicationMaster: finishApplicationMaster with SUCCEEDED 14/05/21 16:08:25 INFO ApplicationMaster: AppMaster received a signal. 14/05/21 16:08:25 INFO ApplicationMaster: Deleting staging directory .sparkStaging/application_1400696988985_0007 14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/05/21 16:08:25 ERROR ApplicationMaster: Failed to cleanup staging dir .sparkStaging/application_1400696988985_0007 java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1685) at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:591) at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:587) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:587) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$cleanupStagingDir(ApplicationMaster.scala:371) at org.apache.spark.deploy.yarn.ApplicationMaster$AppMasterShutdownHook.run(ApplicationMaster.scala:386) at org.apache.hadoop.util.S
Re: Spark on Yarn - A small issue !
You need to look at the logs files for yarn. Generally this can be done with "yarn logs -applicationId ". That only works if you have log aggregation enabled though. You should be able to see atleast the application master logs through the yarn resourcemanager web ui. I would try that first. If that doesn't work you can turn on debug in the nodemanager: To review per-container launch environment, increase yarn.nodemanager.delete.debug-delay-sec to a large value (e.g. 36000), and then access the application cache through yarn.nodemanager.local-dirs on the nodes on which containers are launched. This directory contains the launch script, jars, and all environment variables used for launching each container. This process is useful for debugging classpath problems in particular. (Note that enabling this requires admin privileges on cluster settings and a restart of all node managers. Thus, this is not applicable to hosted clusters). Tom On Monday, May 12, 2014 9:38 AM, Sai Prasanna wrote: Hi All, I wanted to launch Spark on Yarn, interactive - yarn client mode. With default settings of yarn-site.xml and spark-env.sh, i followed the given link http://spark.apache.org/docs/0.8.1/running-on-yarn.html I get the pi value correct when i run without launching the shell. When i launch the shell, with following command, SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.3.0.jar \ SPARK_YARN_APP_JAR=examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar \ MASTER=yarn-client ./spark-shell And try to create RDDs and do some action on it, i get nothing. After sometime tasks fails. LogFile of spark: 519095 14/05/12 13:30:40 INFO YarnClientClusterScheduler: YarnClientClusterScheduler.postStartHook done 519096 14/05/12 13:30:40 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:38355 with 324.4 MB RAM 519097 14/05/12 13:31:38 INFO MemoryStore: ensureFreeSpace(202584) called with curMem=0, maxMem=340147568 519098 14/05/12 13:31:38 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 197.8 KB, free 324.2 MB) 519099 14/05/12 13:31:49 INFO FileInputFormat: Total input paths to process : 1 519100 14/05/12 13:31:49 INFO NetworkTopology: Adding a new node: /default-rack/192.168.1.100:50010 519101 14/05/12 13:31:49 INFO SparkContext: Starting job: top at :15 519102 14/05/12 13:31:49 INFO DAGScheduler: Got job 0 (top at :15) with 4 output partitions (allowLocal=false) 519103 14/05/12 13:31:49 INFO DAGScheduler: Final stage: Stage 0 (top at :15) 519104 14/05/12 13:31:49 INFO DAGScheduler: Parents of final stage: List() 519105 14/05/12 13:31:49 INFO DAGScheduler: Missing parents: List() 519106 14/05/12 13:31:49 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at top at :15), which has no missing par ents 519107 14/05/12 13:31:49 INFO DAGScheduler: Submitting 4 missing tasks from Stage 0 (MapPartitionsRDD[2] at top at :15) 519108 14/05/12 13:31:49 INFO YarnClientClusterScheduler: Adding task set 0.0 with 4 tasks 519109 14/05/12 13:31:49 INFO RackResolver: Resolved s1 to /default-rack 519110 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:3 as TID 0 on executor 1: s1 (PROCESS_LOCAL) 519111 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:3 as 1811 bytes in 4 ms 519112 14/05/12 13:31:49 INFO ClusterTaskSetManager: Starting task 0.0:0 as TID 1 on executor 1: s1 (NODE_LOCAL) 519113 14/05/12 13:31:49 INFO ClusterTaskSetManager: Serialized task 0.0:0 as 1811 bytes in 1 ms 519114 14/05/12 13:32:18INFO YarnClientSchedulerBackend: Executor 1 disconnected, so removing it 519115 14/05/12 13:32:18 ERROR YarnClientClusterScheduler: Lost executor 1 on s1: remote Akka client shutdown 519116 14/05/12 13:32:18 INFO ClusterTaskSetManager: Re-queueing tasks for 1 from TaskSet 0.0 519117 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 1 (task 0.0:0) 519118 14/05/12 13:32:18 WARN ClusterTaskSetManager: Lost TID 0 (task 0.0:3) 519119 14/05/12 13:32:18 INFO DAGScheduler: Executor lost: 1 (epoch 0) 519120 14/05/12 13:32:18 INFO BlockManagerMasterActor: Trying to remove executor 1 from BlockManagerMaster. 519121 14/05/12 13:32:18 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor Do i need to set any other env-variable specifically for SPARK on YARN. What could be the isuue ?? Can anyone please help me in this regard. Thanks in Advance !!
Re: configure spark history server for running on Yarn
Since 1.0 is still in development you can pick up the latest docs in git: https://github.com/apache/spark/tree/branch-1.0/docs I didn't see anywhere that you said you started the spark history server? there are multiple things that need to happen for the spark history server to work. 1) configure your application to save the history logs - see the eventLog settings here https://github.com/apache/spark/blob/branch-1.0/docs/configuration.md 2) On yarn - know the host/port where you are going to start the spark history server and configure: spark.yarn.historyServer.address to point to it. Note that this purely makes the link from the ResourceManager UI properly point to the Spark History Server Daemon. 3) Start the spark history server pointing to the same directory as specified in your application (spark.eventLog.dir) 4) run your application. once it finishes then you can either go to the RM UI to link to the spark history UI or go directly to the spark history server ui. Tom On Thursday, May 1, 2014 7:09 PM, Jenny Zhao wrote: Hi, I have installed spark 1.0 from the branch-1.0, build went fine, and I have tried running the example on Yarn client mode, here is my command: /home/hadoop/spark-branch-1.0/bin/spark-submit /home/hadoop/spark-branch-1.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.2.0.jar --master yarn --deploy-mode client --executor-memory 6g --executor-cores 3 --driver-memory 3g --name SparkPi --num-executors 2 --class org.apache.spark.examples.SparkPi yarn-client 5 after the run, I was not being able to retrieve the log from Yarn's web UI, while I have tried to specify the history server in spark-env.sh export SPARK_DAEMON_JAVA_OPTS="-Dspark.yarn.historyServer.address=master:18080" I also tried to specify it in spark-defaults.conf, doesn't work as well, I would appreciate if someone can tell me what is the way of specifying it either in spark-env.sh or spark-defaults.conf, so that this option can be applied to any spark application. another thing I found is the usage output for spark-submit is not complete/not in sync with the online documentation, hope it is addressed with the formal release. and is this the latest documentation for spark 1.0? http://people.csail.mit.edu/matei/spark-unified-docs/running-on-yarn.html Thank you!
Re: Spark on YARN performance
I haven't run on mesos before, but I do run on yarn. The performance differences are going to be in how long it takes you go get the Executors allocated. On yarn that is going to depend on the cluster setup. If you have dedicated resources to a queue where you are running your spark job the overhead is pretty minimal. Now if your cluster is multi-tenant and is really busy and you allow other queues are using your capacity it could take some time. It is also possible to run into the situation where the memory of the nodemanagers get fragmented and you don't have any slots big enough for you so you have to wait for other applications to finish. Again this mostly depends on the setup, how big of containers you need for Spark, etc. Tom On Thursday, April 10, 2014 11:12 AM, Flavio Pompermaier wrote: Thank you for the reply Mayur, it would be nice to have a comparison about that. I hope one day it will be available, or to have the time to test it myself :) So you're using Mesos for the moment, right? Which are the main differences in you experience? YARN seems to be more flexible and interoperable with other frameworks..am I wrong? Best, Flavio On Thu, Apr 10, 2014 at 5:55 PM, Mayur Rustagi wrote: I've had better luck with standalone in terms of speed & latency. I think thr is impact but not really very high. Bigger impact is towards being able to manage resources & share cluster. > > >Mayur Rustagi >Ph: +1 (760) 203 3257 >http://www.sigmoidanalytics.com >@mayur_rustagi > > > > > >On Wed, Apr 9, 2014 at 12:10 AM, Flavio Pompermaier >wrote: > >Hi to everybody, >>I'm new to Spark and I'd like to know if running Spark on top of YARN or >>Mesos could affect (and how much) its performance. Is there any doc about >>this? >> >> >>Best, >>Flavio
Re: Spark 1.0.0 release plan
Do we have a list of things we really want to get in for 1.X? Perhaps move any jira out to a 1.1 release if we aren't targetting them for 1.0. It might be nice to send out reminders when these dates are approaching. Tom On Thursday, April 3, 2014 11:19 PM, Bhaskar Dutta wrote: Thanks a lot guys! On Fri, Apr 4, 2014 at 5:34 AM, Patrick Wendell wrote: Btw - after that initial thread I proposed a slightly more detailed set of dates: > >https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage > > >- Patrick > > > >On Thu, Apr 3, 2014 at 11:28 AM, Matei Zaharia wrote: > >Hey Bhaskar, this is still the plan, though QAing might take longer than 15 >days. Right now since we’ve passed April 1st, the only features considered for >a merge are those that had pull requests in review before. (Some big ones are >things like annotating the public APIs and simplifying configuration). Bug >fixes and things like adding Python / Java APIs for new components will also >still be considered. >> >> >>Matei >> >> >>On Apr 3, 2014, at 10:30 AM, Bhaskar Dutta wrote: >> >>Hi, >>> >>> >>>Is there any change in the release plan for Spark 1.0.0-rc1 release date >>>from what is listed in the "Proposal for Spark Release Strategy" thread? >>>== Tentative Release Window for 1.0.0 == Feb 1st - April 1st: General development April 1st: Code freeze for new features April 15th: RC1 >>>Thanks, >>>Bhaskar >> >
Re: Submitting to yarn cluster
Generally the yarn cluster handles propogating and setting HADOOP_CONF_DIR for any containers it launches, so it should really just be on your client node submitting the applications. I haven't specifically tried doing what you said, but like you say Spark doesn't really expose the configuration object being used. It does have an interface to pass it in: Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf). But I don't know if that has been tested to make sure it propogates everywhere. There are also places it calls SparkHadoopUtil.get.newConfiguration() so not sure those would handle it properly. You can always file a jira to add support for it and see what people think. Tom On Thursday, April 3, 2014 8:46 AM, Ron Gonzalez wrote: Right thanks, that worked. My goal is to programmatically submit things to the yarn cluster. The underlying framework we have is a set of property files that specify different machines for dev, qe, prod. While it's definitely possible to have different things deployed as the client etc/hadoop directory, I was just curious if the only way is to have the different things setup as environment variables or if there was a way to programmatically override particular configurations. I looked at the Client.scala code and it seems like it creates a new Configuration object that isn't accessible from the outside so most likely the answer is no, which is a reasonable answer. I just have to figure out a different deployment model for doing the different stages of the lifecycle. Thanks, Ron On Thursday, April 3, 2014 6:29 AM, Tom Graves wrote: You should just be making sure your HADOOP_CONF_DIR env variable is correct and not setting yarn.resourcemanager.address in SparkConf. For Yarn/Hadoop you need to point it to the configuration files for your cluster. Generally that setting goes into yarn-site.xml. If just setting it doesn't work, make sure $HADOOP_CONF_DIR is getting put into your classpath. I would also make sure HADOOP_PREFIX is being set. Tom On Wednesday, April 2, 2014 10:10 PM, Ron Gonzalez wrote: Hi, I have a small program but I cannot seem to make it connect to the right properties of the cluster. I have the SPARK_YARN_APP_JAR, SPARK_JAR and SPARK_HOME set properly. If I run this scala file, I am seeing that this is never using the yarn.resourcemanager.address property that I set on the SparkConf instance. Any advice? Thanks, Ron import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.deploy.yarn.Client import java.lang.System import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/home/rgonzalez/app/spark-0.9.0-incubating-bin-hadoop2/README.md" val conf = new SparkConf() conf.set("yarn.resourcemanager.address", "localhost:8050") val sc = new SparkContext("yarn-client", "Simple App", conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
Re: Submitting to yarn cluster
You should just be making sure your HADOOP_CONF_DIR env variable is correct and not setting yarn.resourcemanager.address in SparkConf. For Yarn/Hadoop you need to point it to the configuration files for your cluster. Generally that setting goes into yarn-site.xml. If just setting it doesn't work, make sure $HADOOP_CONF_DIR is getting put into your classpath. I would also make sure HADOOP_PREFIX is being set. Tom On Wednesday, April 2, 2014 10:10 PM, Ron Gonzalez wrote: Hi, I have a small program but I cannot seem to make it connect to the right properties of the cluster. I have the SPARK_YARN_APP_JAR, SPARK_JAR and SPARK_HOME set properly. If I run this scala file, I am seeing that this is never using the yarn.resourcemanager.address property that I set on the SparkConf instance. Any advice? Thanks, Ron import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.deploy.yarn.Client import java.lang.System import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/home/rgonzalez/app/spark-0.9.0-incubating-bin-hadoop2/README.md" val conf = new SparkConf() conf.set("yarn.resourcemanager.address", "localhost:8050") val sc = new SparkContext("yarn-client", "Simple App", conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
Re: Pig on Spark
I had asked a similar question on the dev mailing list a while back (Jan 22nd). See the archives: http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser -> look for spork. Basically Matei said: Yup, that was it, though I believe people at Twitter picked it up again recently. I’d suggest asking Dmitriy if you know him. I’ve seen interest in this from several other groups, and if there’s enough of it, maybe we can start another open source repo to track it. The work in that repo you pointed to was done over one week, and already had most of Pig’s operators working. (I helped out with this prototype over Twitter’s hack week.) That work also calls the Scala API directly, because it was done before we had a Java API; it should be easier with the Java one. Tom On Thursday, March 6, 2014 3:11 PM, Sameer Tilak wrote: Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts.