Re: Concurrent Spark jobs

2016-03-31 Thread emlyn
In case anyone else has the same problem and finds this - in my case it was
fixed by increasing spark.sql.broadcastTimeout (I used 9000).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26648.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Concurrent Spark jobs

2016-01-25 Thread emlyn
Jean wrote
> Have you considered using pools?
> http://spark.apache.org/docs/latest/job-scheduling.html#fair-scheduler-pools
> 
> I haven't tried that by myself, but it looks like pool setting is applied
> per thread so that means it's possible to configure fair scheduler, so
> that more, than one job is on a go. Although each of them would probably
> use less number of workers...

Thanks for the tip, but I don't think that would work in this case - while
writing to Redshift, the cluster is sitting idle without the new tasks even
appearing on the pending queue yet, so changing how it executes the jobs on
the queue won't help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Concurrent Spark jobs

2016-01-22 Thread Eugene Morozov
Emlyn,

Have you considered using pools?
http://spark.apache.org/docs/latest/job-scheduling.html#fair-scheduler-pools

I haven't tried that by myself, but it looks like pool setting is applied
per thread so that means it's possible to configure fair scheduler, so that
more, than one job is on a go. Although each of them would probably use
less number of workers...

Hope this helps.
--
Be well!
Jean Morozov

On Thu, Jan 21, 2016 at 3:23 PM, emlyn <em...@swiftkey.com> wrote:

> Thanks for the responses (not sure why they aren't showing up on the list).
>
> Michael wrote:
> > The JDBC wrapper for Redshift should allow you to follow these
> > instructions. Let me know if you run into any more issues.
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/best-practices-for-pushing-an-RDD-into-a-database-td2681.html
>
> I'm not sure that this solves my problem - if I understand it correctly,
> this is to split a database write over multiple concurrent connections (one
> from each partition), whereas what I want is to allow other tasks to
> continue running on the cluster while the the write to Redshift is taking
> place.
> Also I don't think it's good practice to load data into Redshift with
> INSERT
> statements over JDBC - it is recommended to use the bulk load commands that
> can analyse the data and automatically set appropriate compression etc on
> the table.
>
>
> Rajesh wrote:
> > Just a thought. Can we use Spark Job Server and trigger jobs through rest
> > apis. In this case, all jobs will share same context and run the jobs
> > parallel.
> > If any one has other thoughts please share
>
> I'm not sure this would work in my case as they are not completely separate
> jobs, but just different outputs to Redshift, that share intermediate
> results. Running them as completely separate jobs would mean recalculating
> the intermediate results for each output. I suppose it might be possible to
> persist the intermediate results somewhere, and then delete them once all
> the jobs have run, but that is starting to add a lot of complication which
> I'm not sure is justified.
>
>
> Maybe some pseudocode would help clarify things, so here is a very
> simplified view of our Spark application:
>
> // load and transform data, then cache the result
> df1 = transform1(sqlCtx.read().options(...).parquet('path/to/data'))
> df1.cache()
>
> // perform some further transforms of the cached data
> df2 = transform2(df1)
> df3 = transform3(df1)
>
> // write the final data out to Redshift
> df2.write().options(...).(format "com.databricks.spark.redshift").save()
> df3.write().options(...).(format "com.databricks.spark.redshift").save()
>
>
> When the application runs, the steps are executed in the following order:
> - scan parquet folder
> - transform1 executes
> - df1 stored in cache
> - transform2 executes
> - df2 written to Redshift (while cluster sits idle)
> - transform3 executes
> - df3 written to Redshift
>
> I would like transform3 to begin executing as soon as the cluster has
> capacity, without having to wait for df2 to be written to Redshift, so I
> tried rewriting the last two lines as (again pseudocode):
>
> f1 = future{df2.write().options(...).(format
> "com.databricks.spark.redshift").save()}.execute()
> f2 = future{df3.write().options(...).(format
> "com.databricks.spark.redshift").save()}.execute()
> f1.get()
> f2.get()
>
> In the hope that the first write would no longer block the following steps,
> but instead it fails with a TimeoutException (see stack trace in previous
> message). Is there a way to start the different writes concurrently, or is
> that not possible in Spark?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26030.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Concurrent Spark jobs

2016-01-21 Thread emlyn
Thanks for the responses (not sure why they aren't showing up on the list).

Michael wrote:
> The JDBC wrapper for Redshift should allow you to follow these
> instructions. Let me know if you run into any more issues. 
> http://apache-spark-user-list.1001560.n3.nabble.com/best-practices-for-pushing-an-RDD-into-a-database-td2681.html

I'm not sure that this solves my problem - if I understand it correctly,
this is to split a database write over multiple concurrent connections (one
from each partition), whereas what I want is to allow other tasks to
continue running on the cluster while the the write to Redshift is taking
place.
Also I don't think it's good practice to load data into Redshift with INSERT
statements over JDBC - it is recommended to use the bulk load commands that
can analyse the data and automatically set appropriate compression etc on
the table.


Rajesh wrote:
> Just a thought. Can we use Spark Job Server and trigger jobs through rest
> apis. In this case, all jobs will share same context and run the jobs
> parallel.
> If any one has other thoughts please share

I'm not sure this would work in my case as they are not completely separate
jobs, but just different outputs to Redshift, that share intermediate
results. Running them as completely separate jobs would mean recalculating
the intermediate results for each output. I suppose it might be possible to
persist the intermediate results somewhere, and then delete them once all
the jobs have run, but that is starting to add a lot of complication which
I'm not sure is justified.


Maybe some pseudocode would help clarify things, so here is a very
simplified view of our Spark application:

// load and transform data, then cache the result
df1 = transform1(sqlCtx.read().options(...).parquet('path/to/data'))
df1.cache()

// perform some further transforms of the cached data
df2 = transform2(df1)
df3 = transform3(df1)

// write the final data out to Redshift
df2.write().options(...).(format "com.databricks.spark.redshift").save()
df3.write().options(...).(format "com.databricks.spark.redshift").save()


When the application runs, the steps are executed in the following order:
- scan parquet folder
- transform1 executes
- df1 stored in cache
- transform2 executes
- df2 written to Redshift (while cluster sits idle)
- transform3 executes
- df3 written to Redshift

I would like transform3 to begin executing as soon as the cluster has
capacity, without having to wait for df2 to be written to Redshift, so I
tried rewriting the last two lines as (again pseudocode):

f1 = future{df2.write().options(...).(format
"com.databricks.spark.redshift").save()}.execute()
f2 = future{df3.write().options(...).(format
"com.databricks.spark.redshift").save()}.execute()
f1.get()
f2.get()

In the hope that the first write would no longer block the following steps,
but instead it fails with a TimeoutException (see stack trace in previous
message). Is there a way to start the different writes concurrently, or is
that not possible in Spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26030.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Concurrent Spark jobs

2016-01-19 Thread Madabhattula Rajesh Kumar
Hi,

Just a thought. Can we use Spark Job Server and trigger jobs through rest
apis. In this case, all jobs will share same context and run the jobs
parallel.

If any one has other thoughts please share

Regards,
Rajesh

On Tue, Jan 19, 2016 at 10:28 PM, emlyn <em...@swiftkey.com> wrote:

> We have a Spark application that runs a number of ETL jobs, writing the
> outputs to Redshift (using databricks/spark-redshift). This is triggered by
> calling DataFrame.write.save on the different DataFrames one after another.
> I noticed that during the Redshift load while the output of one job is
> being
> loaded into Redshift (which can take ~20 minutes for some jobs), the
> cluster
> is sitting idle.
>
> In order to maximise the use of the cluster, we tried starting a thread for
> each job so that they can all be submitted simultaneously, and therefore
> the
> cluster can be utilised by another job while one is being written to
> Redshift.
>
> However, when this is run, it fails with a TimeoutException (see stack
> trace
> below). Would it make sense to increase "spark.sql.broadcastTimeout"? I'm
> not sure that would actually solve anything. Should it not be possible to
> save multiple DataFrames simultaneously? Or any other hints on how to make
> better use of the cluster's resources?
>
> Thanks.
>
>
> Stack trace:
>
> Exception in thread "main" java.util.concurrent.ExecutionException:
> java.util.concurrent.TimeoutException: Futures timed out after [300
> seconds]
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> ...
> at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> ...
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
>
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at
>
> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> at
> org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> at
>
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
> at
>
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
> at
> org.apache.spark.sql.DataFrame.rdd$lzycompute(DataFrame.scala:1676)
> at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:1673)
> at
> org.apache.spark.sql.DataFrame.mapPartitions(DataFrame.scala:1465)
> at
>
> com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:264)
> at
>
> com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:374)
> at
>
> com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
> at
>
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Concurrent Spark jobs

2016-01-19 Thread emlyn
We have a Spark application that runs a number of ETL jobs, writing the
outputs to Redshift (using databricks/spark-redshift). This is triggered by
calling DataFrame.write.save on the different DataFrames one after another.
I noticed that during the Redshift load while the output of one job is being
loaded into Redshift (which can take ~20 minutes for some jobs), the cluster
is sitting idle.

In order to maximise the use of the cluster, we tried starting a thread for
each job so that they can all be submitted simultaneously, and therefore the
cluster can be utilised by another job while one is being written to
Redshift.

However, when this is run, it fails with a TimeoutException (see stack trace
below). Would it make sense to increase "spark.sql.broadcastTimeout"? I'm
not sure that would actually solve anything. Should it not be possible to
save multiple DataFrames simultaneously? Or any other hints on how to make
better use of the cluster's resources?

Thanks.


Stack trace:

Exception in thread "main" java.util.concurrent.ExecutionException:
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
...
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.DataFrame.rdd$lzycompute(DataFrame.scala:1676)
at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:1673)
at org.apache.spark.sql.DataFrame.mapPartitions(DataFrame.scala:1465)
at
com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:264)
at
com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:374)
at
com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to efficiently control concurrent Spark jobs

2015-02-26 Thread Jeffrey Jedele
So basically you have lots of small ML tasks you want to run concurrently?

With I've used repartition and cache to store the sub-datasets on only one
machine you mean that you reduced each RDD to have one partition only?

Maybe you want to give the fair scheduler a try to get more of your tasks
executing concurrently. Just an idea...

Regards,
Jeff

2015-02-25 12:06 GMT+01:00 Staffan staffan.arvids...@gmail.com:

 Hi,
 Is there a good way (recommended way) to control and run multiple Spark
 jobs
 within the same application? My application is like follows;

 1) Run one Spark job on a 'ful' dataset, which then creates a few thousands
 of RDDs containing sub-datasets from the complete dataset. Each of the
 sub-datasets are independent from the others (the 'ful' dataset is simply a
 dump from a database containing several different types of records).
 2) Run some filtration and manipulations on each of the RDD and finally do
 some ML on the data. (Each of the created RDD's from step 1) is completely
 independent so this should be run concurrently).

 I've implemented this by using Scala Futures and executing the Spark jobs
 in
 2) from a separate thread for each RDD. This works and improves runtime
 compared to a naive for-loop over step 2). Scaling is however not as good
 as
 I would expect it to be. (28 minutes for 4 cores on 1 machine - 19 minutes
 for 12 cores on 3 machines).

 Each of the sub-datasets are fairly small so I've used 'repartition' and
 'cache' to store the sub-datasets on only one machine in step 1), this
 improved runtime a few %.

 So, either do anyone have a suggestion of how to do this in a better way or
 perhaps if there a higher level workflow tool that I can use on top of
 Spark? (The cool solution would have been to use nestled RDDs and just map
 over them in a high level way, but as this is not supported afaik).

 Thanks!
 Staffan



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-control-concurrent-Spark-jobs-tp21800.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




How to efficiently control concurrent Spark jobs

2015-02-25 Thread Staffan
Hi,
Is there a good way (recommended way) to control and run multiple Spark jobs
within the same application? My application is like follows;

1) Run one Spark job on a 'ful' dataset, which then creates a few thousands
of RDDs containing sub-datasets from the complete dataset. Each of the
sub-datasets are independent from the others (the 'ful' dataset is simply a
dump from a database containing several different types of records). 
2) Run some filtration and manipulations on each of the RDD and finally do
some ML on the data. (Each of the created RDD's from step 1) is completely
independent so this should be run concurrently). 

I've implemented this by using Scala Futures and executing the Spark jobs in
2) from a separate thread for each RDD. This works and improves runtime
compared to a naive for-loop over step 2). Scaling is however not as good as
I would expect it to be. (28 minutes for 4 cores on 1 machine - 19 minutes
for 12 cores on 3 machines). 

Each of the sub-datasets are fairly small so I've used 'repartition' and
'cache' to store the sub-datasets on only one machine in step 1), this
improved runtime a few %. 

So, either do anyone have a suggestion of how to do this in a better way or
perhaps if there a higher level workflow tool that I can use on top of
Spark? (The cool solution would have been to use nestled RDDs and just map
over them in a high level way, but as this is not supported afaik).

Thanks!
Staffan 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-control-concurrent-Spark-jobs-tp21800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org