Re: Concurrent Spark jobs
In case anyone else has the same problem and finds this - in my case it was fixed by increasing spark.sql.broadcastTimeout (I used 9000). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Concurrent Spark jobs
Jean wrote > Have you considered using pools? > http://spark.apache.org/docs/latest/job-scheduling.html#fair-scheduler-pools > > I haven't tried that by myself, but it looks like pool setting is applied > per thread so that means it's possible to configure fair scheduler, so > that more, than one job is on a go. Although each of them would probably > use less number of workers... Thanks for the tip, but I don't think that would work in this case - while writing to Redshift, the cluster is sitting idle without the new tasks even appearing on the pending queue yet, so changing how it executes the jobs on the queue won't help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Concurrent Spark jobs
Emlyn, Have you considered using pools? http://spark.apache.org/docs/latest/job-scheduling.html#fair-scheduler-pools I haven't tried that by myself, but it looks like pool setting is applied per thread so that means it's possible to configure fair scheduler, so that more, than one job is on a go. Although each of them would probably use less number of workers... Hope this helps. -- Be well! Jean Morozov On Thu, Jan 21, 2016 at 3:23 PM, emlyn <em...@swiftkey.com> wrote: > Thanks for the responses (not sure why they aren't showing up on the list). > > Michael wrote: > > The JDBC wrapper for Redshift should allow you to follow these > > instructions. Let me know if you run into any more issues. > > > http://apache-spark-user-list.1001560.n3.nabble.com/best-practices-for-pushing-an-RDD-into-a-database-td2681.html > > I'm not sure that this solves my problem - if I understand it correctly, > this is to split a database write over multiple concurrent connections (one > from each partition), whereas what I want is to allow other tasks to > continue running on the cluster while the the write to Redshift is taking > place. > Also I don't think it's good practice to load data into Redshift with > INSERT > statements over JDBC - it is recommended to use the bulk load commands that > can analyse the data and automatically set appropriate compression etc on > the table. > > > Rajesh wrote: > > Just a thought. Can we use Spark Job Server and trigger jobs through rest > > apis. In this case, all jobs will share same context and run the jobs > > parallel. > > If any one has other thoughts please share > > I'm not sure this would work in my case as they are not completely separate > jobs, but just different outputs to Redshift, that share intermediate > results. Running them as completely separate jobs would mean recalculating > the intermediate results for each output. I suppose it might be possible to > persist the intermediate results somewhere, and then delete them once all > the jobs have run, but that is starting to add a lot of complication which > I'm not sure is justified. > > > Maybe some pseudocode would help clarify things, so here is a very > simplified view of our Spark application: > > // load and transform data, then cache the result > df1 = transform1(sqlCtx.read().options(...).parquet('path/to/data')) > df1.cache() > > // perform some further transforms of the cached data > df2 = transform2(df1) > df3 = transform3(df1) > > // write the final data out to Redshift > df2.write().options(...).(format "com.databricks.spark.redshift").save() > df3.write().options(...).(format "com.databricks.spark.redshift").save() > > > When the application runs, the steps are executed in the following order: > - scan parquet folder > - transform1 executes > - df1 stored in cache > - transform2 executes > - df2 written to Redshift (while cluster sits idle) > - transform3 executes > - df3 written to Redshift > > I would like transform3 to begin executing as soon as the cluster has > capacity, without having to wait for df2 to be written to Redshift, so I > tried rewriting the last two lines as (again pseudocode): > > f1 = future{df2.write().options(...).(format > "com.databricks.spark.redshift").save()}.execute() > f2 = future{df3.write().options(...).(format > "com.databricks.spark.redshift").save()}.execute() > f1.get() > f2.get() > > In the hope that the first write would no longer block the following steps, > but instead it fails with a TimeoutException (see stack trace in previous > message). Is there a way to start the different writes concurrently, or is > that not possible in Spark? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26030.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Concurrent Spark jobs
Thanks for the responses (not sure why they aren't showing up on the list). Michael wrote: > The JDBC wrapper for Redshift should allow you to follow these > instructions. Let me know if you run into any more issues. > http://apache-spark-user-list.1001560.n3.nabble.com/best-practices-for-pushing-an-RDD-into-a-database-td2681.html I'm not sure that this solves my problem - if I understand it correctly, this is to split a database write over multiple concurrent connections (one from each partition), whereas what I want is to allow other tasks to continue running on the cluster while the the write to Redshift is taking place. Also I don't think it's good practice to load data into Redshift with INSERT statements over JDBC - it is recommended to use the bulk load commands that can analyse the data and automatically set appropriate compression etc on the table. Rajesh wrote: > Just a thought. Can we use Spark Job Server and trigger jobs through rest > apis. In this case, all jobs will share same context and run the jobs > parallel. > If any one has other thoughts please share I'm not sure this would work in my case as they are not completely separate jobs, but just different outputs to Redshift, that share intermediate results. Running them as completely separate jobs would mean recalculating the intermediate results for each output. I suppose it might be possible to persist the intermediate results somewhere, and then delete them once all the jobs have run, but that is starting to add a lot of complication which I'm not sure is justified. Maybe some pseudocode would help clarify things, so here is a very simplified view of our Spark application: // load and transform data, then cache the result df1 = transform1(sqlCtx.read().options(...).parquet('path/to/data')) df1.cache() // perform some further transforms of the cached data df2 = transform2(df1) df3 = transform3(df1) // write the final data out to Redshift df2.write().options(...).(format "com.databricks.spark.redshift").save() df3.write().options(...).(format "com.databricks.spark.redshift").save() When the application runs, the steps are executed in the following order: - scan parquet folder - transform1 executes - df1 stored in cache - transform2 executes - df2 written to Redshift (while cluster sits idle) - transform3 executes - df3 written to Redshift I would like transform3 to begin executing as soon as the cluster has capacity, without having to wait for df2 to be written to Redshift, so I tried rewriting the last two lines as (again pseudocode): f1 = future{df2.write().options(...).(format "com.databricks.spark.redshift").save()}.execute() f2 = future{df3.write().options(...).(format "com.databricks.spark.redshift").save()}.execute() f1.get() f2.get() In the hope that the first write would no longer block the following steps, but instead it fails with a TimeoutException (see stack trace in previous message). Is there a way to start the different writes concurrently, or is that not possible in Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26030.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Concurrent Spark jobs
Hi, Just a thought. Can we use Spark Job Server and trigger jobs through rest apis. In this case, all jobs will share same context and run the jobs parallel. If any one has other thoughts please share Regards, Rajesh On Tue, Jan 19, 2016 at 10:28 PM, emlyn <em...@swiftkey.com> wrote: > We have a Spark application that runs a number of ETL jobs, writing the > outputs to Redshift (using databricks/spark-redshift). This is triggered by > calling DataFrame.write.save on the different DataFrames one after another. > I noticed that during the Redshift load while the output of one job is > being > loaded into Redshift (which can take ~20 minutes for some jobs), the > cluster > is sitting idle. > > In order to maximise the use of the cluster, we tried starting a thread for > each job so that they can all be submitted simultaneously, and therefore > the > cluster can be utilised by another job while one is being written to > Redshift. > > However, when this is run, it fails with a TimeoutException (see stack > trace > below). Would it make sense to increase "spark.sql.broadcastTimeout"? I'm > not sure that would actually solve anything. Should it not be possible to > save multiple DataFrames simultaneously? Or any other hints on how to make > better use of the cluster's resources? > > Thanks. > > > Stack trace: > > Exception in thread "main" java.util.concurrent.ExecutionException: > java.util.concurrent.TimeoutException: Futures timed out after [300 > seconds] > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > ... > at > > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) > ... > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [300 seconds] > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at > > org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) > at > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > at > org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) > at > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > at > > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) > at > > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) > at > org.apache.spark.sql.DataFrame.rdd$lzycompute(DataFrame.scala:1676) > at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:1673) > at > org.apache.spark.sql.DataFrame.mapPartitions(DataFrame.scala:1465) > at > > com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:264) > at > > com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:374) > at > > com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106) > at > > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) > at > org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Concurrent Spark jobs
We have a Spark application that runs a number of ETL jobs, writing the outputs to Redshift (using databricks/spark-redshift). This is triggered by calling DataFrame.write.save on the different DataFrames one after another. I noticed that during the Redshift load while the output of one job is being loaded into Redshift (which can take ~20 minutes for some jobs), the cluster is sitting idle. In order to maximise the use of the cluster, we tried starting a thread for each job so that they can all be submitted simultaneously, and therefore the cluster can be utilised by another job while one is being written to Redshift. However, when this is run, it fails with a TimeoutException (see stack trace below). Would it make sense to increase "spark.sql.broadcastTimeout"? I'm not sure that would actually solve anything. Should it not be possible to save multiple DataFrames simultaneously? Or any other hints on how to make better use of the cluster's resources? Thanks. Stack trace: Exception in thread "main" java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at java.util.concurrent.FutureTask.report(FutureTask.java:122) ... at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) ... Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) at org.apache.spark.sql.DataFrame.rdd$lzycompute(DataFrame.scala:1676) at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:1673) at org.apache.spark.sql.DataFrame.mapPartitions(DataFrame.scala:1465) at com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:264) at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:374) at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to efficiently control concurrent Spark jobs
So basically you have lots of small ML tasks you want to run concurrently? With I've used repartition and cache to store the sub-datasets on only one machine you mean that you reduced each RDD to have one partition only? Maybe you want to give the fair scheduler a try to get more of your tasks executing concurrently. Just an idea... Regards, Jeff 2015-02-25 12:06 GMT+01:00 Staffan staffan.arvids...@gmail.com: Hi, Is there a good way (recommended way) to control and run multiple Spark jobs within the same application? My application is like follows; 1) Run one Spark job on a 'ful' dataset, which then creates a few thousands of RDDs containing sub-datasets from the complete dataset. Each of the sub-datasets are independent from the others (the 'ful' dataset is simply a dump from a database containing several different types of records). 2) Run some filtration and manipulations on each of the RDD and finally do some ML on the data. (Each of the created RDD's from step 1) is completely independent so this should be run concurrently). I've implemented this by using Scala Futures and executing the Spark jobs in 2) from a separate thread for each RDD. This works and improves runtime compared to a naive for-loop over step 2). Scaling is however not as good as I would expect it to be. (28 minutes for 4 cores on 1 machine - 19 minutes for 12 cores on 3 machines). Each of the sub-datasets are fairly small so I've used 'repartition' and 'cache' to store the sub-datasets on only one machine in step 1), this improved runtime a few %. So, either do anyone have a suggestion of how to do this in a better way or perhaps if there a higher level workflow tool that I can use on top of Spark? (The cool solution would have been to use nestled RDDs and just map over them in a high level way, but as this is not supported afaik). Thanks! Staffan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-control-concurrent-Spark-jobs-tp21800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to efficiently control concurrent Spark jobs
Hi, Is there a good way (recommended way) to control and run multiple Spark jobs within the same application? My application is like follows; 1) Run one Spark job on a 'ful' dataset, which then creates a few thousands of RDDs containing sub-datasets from the complete dataset. Each of the sub-datasets are independent from the others (the 'ful' dataset is simply a dump from a database containing several different types of records). 2) Run some filtration and manipulations on each of the RDD and finally do some ML on the data. (Each of the created RDD's from step 1) is completely independent so this should be run concurrently). I've implemented this by using Scala Futures and executing the Spark jobs in 2) from a separate thread for each RDD. This works and improves runtime compared to a naive for-loop over step 2). Scaling is however not as good as I would expect it to be. (28 minutes for 4 cores on 1 machine - 19 minutes for 12 cores on 3 machines). Each of the sub-datasets are fairly small so I've used 'repartition' and 'cache' to store the sub-datasets on only one machine in step 1), this improved runtime a few %. So, either do anyone have a suggestion of how to do this in a better way or perhaps if there a higher level workflow tool that I can use on top of Spark? (The cool solution would have been to use nestled RDDs and just map over them in a high level way, but as this is not supported afaik). Thanks! Staffan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-control-concurrent-Spark-jobs-tp21800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org