[jira] [Assigned] (SPARK-27376) Design: YARN supports Spark GPU-aware scheduling

2019-05-16 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-27376:
-

Assignee: Thomas Graves

> Design: YARN supports Spark GPU-aware scheduling
> 
>
> Key: SPARK-27376
> URL: https://issues.apache.org/jira/browse/SPARK-27376
> Project: Spark
>  Issue Type: Sub-task
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Thomas Graves
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27376) Design: YARN supports Spark GPU-aware scheduling

2019-05-16 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841402#comment-16841402
 ] 

Thomas Graves commented on SPARK-27376:
---

The design is pretty straight forward, there is really only 1 question which is 
consistency between the yarn resource configs and now the new spark resource 
configs, see the last paragraph for more details.

Require Hadoop 3.1 and > to get official GPU support.  Hadoop can be configured 
to use docker with isolation so that the containers yarn hands you back has the 
requested gpu's and other resources.  YARN does not give you information about 
what it allocated for gpu's, you have to discover it.  YARN has hardcoded 
resource types for fpga and gpu, anything else is user defined types. Spark 3.0 
already added support for requesting any resource from YARN via the configs: 
spark.yarn.\{executor/driver/am}.resource, so the changes required for this 
Jira are simply to map the new spark configs: 
spark.\{executor/driver}.resource.\{fpga/gpu}.count into the corresponding yarn 
configs. For other resource types we can't map them though because we don't 
know what they are called on the yarn side.  So for any other resource they 
will have to specify both configs spark.yarn.\{executor/driver/am}.resource and 
spark.\{executor/driver}.resource.\{fpga/gpu}.  That isn't ideal but the only 
other option would be to have some sort of mapping the user would pass in.  We 
can always add more yarn resource types if it adds them. The main 2 people are 
interested in seem to be gpu and fpga anyway, so I think for now this is fine.

For versions < hadoop 3.1 it won't allocate based on GPU, so if they are using 
hadoop 2.7, 2.8, etc they could still allocate nodes with GPU, with yarn node 
labels or other hacks, and tell Spark the count and to auto discover them and 
Spark will pick up whatever it sees in the container - or really whatever the 
discoveryScript returns, so people could potentially write that script to match 
whatever hacks they have for sharing gpu nodes now.

The  flow from user point would be:

For GPU and FPGA: User will specify the 
spark.\{executor/driver}.resource.\{gpu/fpga}.count and the 
spark.\{executor/driver}.resource.\{gpu/fpga}.discoveryScript. The spark yarn 
code maps these into the corresponding yarn resource config and asks yarn for 
the containers.  Yarn allocates the containers and Spark will run the discovery 
script to figure out what it has for allocations.

For other resource types the user will have to specify:  
spark.yarn.\{executor/driver/am}.resource and 
spark.\{executor/driver}.resource.\{gpu/fpga}.count and the 
spark.\{executor/driver}.resource.\{gpu/fpga}.discoveryScript.  

The only other thing that is a inconsistent is the 
spark.yarn.\{executor/driver/am}.resource configs don't  have a .count on the 
end. Right now that config takes a string as a value and splits that into an 
actual count and a unit. The yarn resource configs were just added in 3.0 so 
haven't been released so we could potentially change them.  We could change the 
spark user facing configs ( 
spark.\{executor/driver}.resource.\{gpu/fpga}.count) to be similar to make it 
easier for the user to specify both a count and unit in 1 config instead of 2, 
but I like the ability to separate them on the discovery side as well. We took  
the .unit support out in the executor pull request so it isn't there right now 
anyway.  We could do the opposite and change the yarn ones to have a .count and 
.unit as well just to make things consistent but that makes user have to 
specify 2 instead of 1.  Or the third option would be to have the .count and 
.unit and then eventually have a third one that lets the user specify them 
together if we add resources that actually use it.

My thoughts are  for the user facing configs we change .count to be .amount and 
let the user specify units on it. This makes it easier for the user and it 
allows us to extend later if we want. I think we should also change the 
spark.yarn configs to have a .amount because yarn has already added other 
things like tags and attributes so we if want to extend the spark support for 
those it makes more sense to have those as another postfix option 
spark.yarn...resource.tags=

We can leave everything else that is internal as separate count and units and 
since gpu/fpga don't need units we don't need to actually add it to our 
ResourceInformation since we already removed it. 

 

> Design: YARN supports Spark GPU-aware scheduling
> 
>
> Key: SPARK-27376
> URL: https://issues.apache.org/jira/browse/SPARK-27376
> Project: Spark
>  Issue Type: Sub-task
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Priority: Major
>




--
This message wa

[jira] [Created] (SPARK-27725) GPU Scheduling - add an example discovery Script

2019-05-15 Thread Thomas Graves (JIRA)
Thomas Graves created SPARK-27725:
-

 Summary: GPU Scheduling - add an example discovery Script
 Key: SPARK-27725
 URL: https://issues.apache.org/jira/browse/SPARK-27725
 Project: Spark
  Issue Type: Story
  Components: Examples
Affects Versions: 3.0.0
Reporter: Thomas Graves


We should add an example script that can be used to discovery GPU's and output 
the correctly formatted JSON.

Something like below, but it needs to be tested on various systems with more 
then 2 gpu's:

ADDRS=`nvidia-smi --query-gpu=index --format=csv,noheader | sed 'N;s/\n/\",\"/'`
COUNT=`echo $ADDRS | tr -cd , | wc -c`
ALLCOUNT=`expr $COUNT + 1`
#echo \{\"name\": \"gpu\", \"addresses\":[\"$ADDRS\"]}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27024) Executor interface for cluster managers to support GPU resources

2019-05-14 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-27024.
---
Resolution: Fixed

> Executor interface for cluster managers to support GPU resources
> 
>
> Key: SPARK-27024
> URL: https://issues.apache.org/jira/browse/SPARK-27024
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Assignee: Thomas Graves
>Priority: Major
>
> The executor interface shall deal with the resources allocated to the 
> executor by cluster managers(Standalone, YARN, Kubernetes).  The Executor 
> either needs to be told the resources it was given or it needs to discover 
> them in order for the executor to sync with the driver to expose available 
> resources to support task scheduling.
> Note this is part of a bigger feature for gpu-aware scheduling and is just 
> how the executor find the resources. The general flow :
>  * users ask for a certain set of resources, for instance number of gpus - 
> each cluster manager has a specific way to do this.
>  * cluster manager allocates a container or set of resources (standalone mode)
>  * When spark launches the executor in that container, the executor either 
> has to be told what resources it has or it has to auto discover them.
>  * Executor has to register with Driver and tell the driver the set of 
> resources it has so the scheduler can use that to schedule tasks that 
> requires a certain amount of each of those resources



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27488) Driver interface to support GPU resources

2019-05-14 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839402#comment-16839402
 ] 

Thomas Graves commented on SPARK-27488:
---

With this Jira we need to move the function that does the checks for various 
configs to a common place and we need to address the comment here: 
[https://github.com/apache/spark/pull/24406/files/b9dacef1d7d47d19df300628d3841b3a13c03547#r283617243]

To make it more clear on variables names.

> Driver interface to support GPU resources 
> --
>
> Key: SPARK-27488
> URL: https://issues.apache.org/jira/browse/SPARK-27488
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Assignee: Thomas Graves
>Priority: Major
>
> We want to have an interface to allow the users on the driver to get what 
> resources are allocated to them.  This is mostly to handle the case the 
> cluster manager does not launch the driver in an isolated environment and 
> where users could be sharing hosts.  For instance in standalone mode it 
> doesn't support container isolation so a host may have 4 gpu's, but only 2 of 
> them could be assigned to the driver.  In this case we need an interface for 
> the cluster manager to specify what gpu's for the driver to use and an 
> interface for the user to get the resource information



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27520) Introduce a global config system to replace hadoopConfiguration

2019-05-09 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836730#comment-16836730
 ] 

Thomas Graves commented on SPARK-27520:
---

Can we add more to the description to explain why we are doing this?   I'm 
assuming this is to allow users to more easily change it.

> Introduce a global config system to replace hadoopConfiguration
> ---
>
> Key: SPARK-27520
> URL: https://issues.apache.org/jira/browse/SPARK-27520
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Priority: Major
>
> hadoopConf can be accessed via `SparkContext.hadoopConfiguration` from both 
> user code and Spark internal. The configuration is mainly used to read files 
> from hadoop-supported file system(eg. get URI/get FileSystem/add security 
> credentials/get metastore connect url/etc.)
> We shall keep a global config that users can set and use that to track the 
> hadoop configurations, and avoid using `SparkContext.hadoopConfiguration`, 
> maybe we shall mark it as deprecate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27492) GPU scheduling - High level user documentation

2019-05-09 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-27492:
--
Description: 
For the SPIP - Accelerator-aware task scheduling for Spark, 
https://issues.apache.org/jira/browse/SPARK-24615 Add some high level user 
documentation about how this feature works together and point to things like 
the example discovery script, etc.

 

- make sure to document the discovery script and what permissions are needed 
and any security implications

  was:For the SPIP - Accelerator-aware task scheduling for Spark, 
https://issues.apache.org/jira/browse/SPARK-24615 Add some high level user 
documentation about how this feature works together and point to things like 
the example discovery script, etc.


> GPU scheduling - High level user documentation
> --
>
> Key: SPARK-27492
> URL: https://issues.apache.org/jira/browse/SPARK-27492
> Project: Spark
>  Issue Type: Story
>  Components: Documentation
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Major
>
> For the SPIP - Accelerator-aware task scheduling for Spark, 
> https://issues.apache.org/jira/browse/SPARK-24615 Add some high level user 
> documentation about how this feature works together and point to things like 
> the example discovery script, etc.
>  
> - make sure to document the discovery script and what permissions are needed 
> and any security implications



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-05-07 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835014#comment-16835014
 ] 

Thomas Graves commented on SPARK-27396:
---

I just started a vote thread on this SPIP please take a look and vote.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data.  The only way 
> to get at the data in a ColumnVector is through the public APIs.  Exposing 
> the underlying format to improve efficiency will be covered in a separate 
> SPIP.
> This is not trying to implement new ways of transferring data to external 
> ML/AI applications.  That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing. 
>  Currently code generation for columnar processing is only supported when 
> translating columns to rows.  We will continue to support this, but will not 
> extend it as a general solution. That will be covered in a separate SPIP if 
> we find it is helpful.  For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through 
> DataSource V2 or any other similar API.  That would be covered by a separate 
> SPIP if we find it is needed.
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Internal implementations of FileFormats, optionally can return a 
> ColumnarBatch instead of rows.  The code generation phase knows how to take 
> that columnar data and iterate through it as rows for stages that wants rows, 
> which currently is almost everything.  The limitations here are mostly 
> implementation specific. The current standard is to abuse Scala’s type 
> erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The 
> code generation can handle this because it is generating java code, so it 
> bypasses scala’s type checking and just casts the InternalRow to the desired 
> ColumnarBatch.  This makes it difficult for others to implement the same 
> functionality for different processing because they can only do it through 
> code generation. There really is no clean separate path in the code 
> generation for columnar vs row based. Additionally, because it is only 
> supported through code generation if for any reason code generation would 
> fail there is no backup.  This is typically fine for input formats but can be 
> problematic when we get into more extensive processing.
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the f

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-22 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16823097#comment-16823097
 ] 

Thomas Graves commented on SPARK-27396:
---

thanks for the questions and commenting, please also vote on the DEV list email 
chain - subject:

[VOTE][SPARK-27396] SPIP: Public APIs for extended Columnar Processing Support
 
 
I'm going to extend that vote by a few days to give more people time to comment 
as I know its a busy time of year.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking and 
> just casts the InternalRow to the desired ColumnarBatch.  This makes it 
> difficult for others to implement the same functionality for different 
> processing because they can only do it through code generation. There really 
> is no clean separate path in the code generation for columnar vs row based. 
> Additionally because it is only supported through code generation if for any 
> reason code generation would fail there is no backup.  This is typically fine 
> for input formats but can

[jira] [Comment Edited] (SPARK-27495) Support Stage level resource configuration and scheduling

2019-04-19 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822166#comment-16822166
 ] 

Thomas Graves edited comment on SPARK-27495 at 4/19/19 8:29 PM:


Unfortunately the link to the original design doc was removed from  
https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there 
was some good discussions about this in comments    
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293
 all the way through 
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something 
like:
{code:java}
val rdd2 = rdd1.withResources().mapPartitions(){code}
Possibly getting more detailed:
{code:java}
rdd.withResources
 .prefer("/gpu/k80", 2) // prefix of resource logical name, amount 
.require("/cpu", 1)
 .require("/memory", 819200)
 .require("/disk", 1){code}

 The withResources would apply to just that stage.  If there were conflicting 
resources in say like a join
{code:java}
val rddA = rdd.withResources.mapPartitions()
val rddB = rdd.withResources.mapPartitions()
val rddC = rddA.join(rddB)
{code}

 Then you would have to resolve that conflict by either choosing largest or 
merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 
{code:java}
So which means RDDs with different resources requirements in one stage may have 
conflicts. For example: rdd1.withResources.mapPartitions { xxx 
}.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be 
different from map rdd, so currently what I can think is that:
1. always pick the latter with warning log to say that multiple different 
resources in one stage is illegal.
 2. fail the stage with warning log to say that multiple different resources in 
one stage is illegal.
 3. merge conflicts with maximum resources needs. For example rdd1 requires 3 
gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would 
be 4 gpus per task. (This is the high level description, details will be per 
partition based merging) [chosen].
 
{code}
 

One thing that was brought up multiple times is how a user will really know 
what stage it applies to.  Users aren't necessarily going to realize where the 
stage boundaries are.  I don't think anyone has a good solution to this.

Also I think for this to really be useful it has to be tied into dynamic 
allocation.  Without that they can just use the application level task configs 
we are adding in  SPARK-24615.

Of course the original proposal was only for RDDs as well. That was because it 
goes with barrier scheduling and I think the dataset/dataframe api is even 
harder to know where stage boundaries are because catalyst can optimize a bunch 
of things.

 


was (Author: tgraves):
Unfortunately the link to the original design doc was removed from  
https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there 
was some good discussions about this in comments    
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293
 all the way through 
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something 
like:
{code:java}
val rdd2 = rdd1.withResources().mapPartitions(){code}
Possibly getting more detailed:
 rdd.withResources
 .prefer("/gpu/k80", 2) // prefix of resource logical name, amount 
.require("/cpu", 1)
 .require("/memory", 819200)
 .require("/disk", 1)
 The withResources would apply to just that stage.  If there were conflicting 
resources in say like a join
 val rddA = rdd.withResources.mapPartitions()

val rddB = rdd.withResources.mapPartitions()

val rddC = rddA.join(rddB)
 Then you would have to resolve that conflict by either choosing largest or 
merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 

 

 
{code:java}
So which means RDDs with different resources requirements in one stage may have 
conflicts. For example: rdd1.withResources.mapPartitions { xxx 
}.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be 
different from map rdd, so currently what I can think is that:
1. always pick the latter with warning log to say that multiple different 
resources in one stage is illegal.
 2. fail the stage with warning lo

[jira] [Comment Edited] (SPARK-27495) Support Stage level resource configuration and scheduling

2019-04-19 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822166#comment-16822166
 ] 

Thomas Graves edited comment on SPARK-27495 at 4/19/19 8:28 PM:


Unfortunately the link to the original design doc was removed from  
https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there 
was some good discussions about this in comments    
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293
 all the way through 
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something 
like:
{code:java}
val rdd2 = rdd1.withResources().mapPartitions(){code}
Possibly getting more detailed:
 rdd.withResources
 .prefer("/gpu/k80", 2) // prefix of resource logical name, amount 
.require("/cpu", 1)
 .require("/memory", 819200)
 .require("/disk", 1)
 The withResources would apply to just that stage.  If there were conflicting 
resources in say like a join
 val rddA = rdd.withResources.mapPartitions()

val rddB = rdd.withResources.mapPartitions()

val rddC = rddA.join(rddB)
 Then you would have to resolve that conflict by either choosing largest or 
merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 

 

 
{code:java}
So which means RDDs with different resources requirements in one stage may have 
conflicts. For example: rdd1.withResources.mapPartitions { xxx 
}.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be 
different from map rdd, so currently what I can think is that:
1. always pick the latter with warning log to say that multiple different 
resources in one stage is illegal.
 2. fail the stage with warning log to say that multiple different resources in 
one stage is illegal.
 3. merge conflicts with maximum resources needs. For example rdd1 requires 3 
gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would 
be 4 gpus per task. (This is the high level description, details will be per 
partition based merging) [chosen].
 
{code}
 

 

One thing that was brought up multiple times is how a user will really know 
what stage it applies to.  Users aren't necessarily going to realize where the 
stage boundaries are.  I don't think anyone has a good solution to this.

Also I think for this to really be useful it has to be tied into dynamic 
allocation.  Without that they can just use the application level task configs 
we are adding in  SPARK-24615.

Of course the original proposal was only for RDDs as well. That was because it 
goes with barrier scheduling and I think the dataset/dataframe api is even 
harder to know where stage boundaries are because catalyst can optimize a bunch 
of things.

 


was (Author: tgraves):
Unfortunately the link to the original design doc was removed from  
https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there 
was some good discussions about this in comments    
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293
 all the way through 
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something 
like:
{code:java}
val rdd2 = rdd1.withResources().mapPartitions(){code}
Possibly getting more detailed:
rdd.withResources
  .prefer("/gpu/k80", 2) // prefix of resource logical name, amount  
.require("/cpu", 1)
  .require("/memory", 819200)
  .require("/disk", 1)
The withResources would apply to just that stage.  If there were conflicting 
resources in say like a join
val rddA = rdd.withResources.mapPartitions()

val rddB = rdd.withResources.mapPartitions()

val rddC = rddA.join(rddB)
Then you would have to resolve that conflict by either choosing largest or 
merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 

 

 
{noformat}
So which means RDDs with different resources requirements in one stage may have 
conflicts. For example: rdd1.withResources.mapPartitions { xxx 
}.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be 
different from map rdd, so currently what I can think is that:
 
1. always pick the latter with warning log to say that multiple different 
resources in one stage is illegal.
 2. fail the stage with warning log to say that multiple di

[jira] [Commented] (SPARK-27495) Support Stage level resource configuration and scheduling

2019-04-19 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822166#comment-16822166
 ] 

Thomas Graves commented on SPARK-27495:
---

Unfortunately the link to the original design doc was removed from  
https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there 
was some good discussions about this in comments    
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293
 all the way through 
https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something 
like:
{code:java}
val rdd2 = rdd1.withResources().mapPartitions(){code}
Possibly getting more detailed:
rdd.withResources
  .prefer("/gpu/k80", 2) // prefix of resource logical name, amount  
.require("/cpu", 1)
  .require("/memory", 819200)
  .require("/disk", 1)
The withResources would apply to just that stage.  If there were conflicting 
resources in say like a join
val rddA = rdd.withResources.mapPartitions()

val rddB = rdd.withResources.mapPartitions()

val rddC = rddA.join(rddB)
Then you would have to resolve that conflict by either choosing largest or 
merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 

 

 
{noformat}
So which means RDDs with different resources requirements in one stage may have 
conflicts. For example: rdd1.withResources.mapPartitions { xxx 
}.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be 
different from map rdd, so currently what I can think is that:
 
1. always pick the latter with warning log to say that multiple different 
resources in one stage is illegal.
 2. fail the stage with warning log to say that multiple different resources in 
one stage is illegal.
 3. merge conflicts with maximum resources needs. For example rdd1 requires 3 
gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would 
be 4 gpus per task. (This is the high level description, details will be per 
partition based merging) [chosen].
{noformat}
 

 

 

One thing that was brought up multiple times is how a user will really know 
what stage it applies to.  Users aren't necessarily going to realize where the 
stage boundaries are.  I don't think anyone has a good solution to this.

Also I think for this to really be useful it has to be tied into dynamic 
allocation.  Without that they can just use the application level task configs 
we are adding in  SPARK-24615.

Of course the original proposal was only for RDDs as well. That was because it 
goes with barrier scheduling and I think the dataset/dataframe api is even 
harder to know where stage boundaries are because catalyst can optimize a bunch 
of things.

 

> Support Stage level resource configuration and scheduling
> -
>
> Key: SPARK-27495
> URL: https://issues.apache.org/jira/browse/SPARK-27495
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Major
>
> Currently Spark supports CPU level scheduling and we are adding in 
> accelerator aware scheduling with 
> https://issues.apache.org/jira/browse/SPARK-24615, but both of those are 
> scheduling via application level configurations.  Meaning there is one 
> configuration that is set for the entire lifetime of the application and the 
> user can't change it between Spark jobs/stages within that application.  
> Many times users have different requirements for different stages of their 
> application so they want to be able to configure at the stage level what 
> resources are required for that stage.
> For example, I might start a spark application which first does some ETL work 
> that needs lots of cores to run many tasks in parallel, then once that is 
> done I want to run some ML job and at that point I want GPU's, less CPU's, 
> and more memory.
> With this Jira we want to add the ability for users to specify the resources 
> for different stages.
> Note that https://issues.apache.org/jira/browse/SPARK-24615 had some 
> discussions on this but this part of it was removed from that.
> We should come up with a proposal on how to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27492) GPU scheduling - High level user documentation

2019-04-19 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-27492:
--
Description: For the SPIP - Accelerator-aware task scheduling for Spark, 
https://issues.apache.org/jira/browse/SPARK-24615 Add some high level user 
documentation about how this feature works together and point to things like 
the example discovery script, etc.  (was: Add some high level user 
documentation about how this feature works together and point to things like 
the example discovery script, etc.)

> GPU scheduling - High level user documentation
> --
>
> Key: SPARK-27492
> URL: https://issues.apache.org/jira/browse/SPARK-27492
> Project: Spark
>  Issue Type: Story
>  Components: Documentation
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Major
>
> For the SPIP - Accelerator-aware task scheduling for Spark, 
> https://issues.apache.org/jira/browse/SPARK-24615 Add some high level user 
> documentation about how this feature works together and point to things like 
> the example discovery script, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27492) GPU scheduling - High level user documentation

2019-04-19 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-27492:
--
Summary: GPU scheduling - High level user documentation  (was: High level 
user documentation)

> GPU scheduling - High level user documentation
> --
>
> Key: SPARK-27492
> URL: https://issues.apache.org/jira/browse/SPARK-27492
> Project: Spark
>  Issue Type: Story
>  Components: Documentation
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Major
>
> Add some high level user documentation about how this feature works together 
> and point to things like the example discovery script, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27492) High level user documentation

2019-04-19 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16821900#comment-16821900
 ] 

Thomas Graves commented on SPARK-27492:
---

Sorry, it is under the epic and didn't realize it didn't care over the context, 
I will update.

> High level user documentation
> -
>
> Key: SPARK-27492
> URL: https://issues.apache.org/jira/browse/SPARK-27492
> Project: Spark
>  Issue Type: Story
>  Components: Documentation
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Major
>
> Add some high level user documentation about how this feature works together 
> and point to things like the example discovery script, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24655) [K8S] Custom Docker Image Expectations and Documentation

2019-04-18 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16821270#comment-16821270
 ] 

Thomas Graves commented on SPARK-24655:
---

>From the linked issues it seems the goals would be:
 * support more then alpine image base - ie a glibc version
 * Allow for adding at least certain support like GPUs - although this may just 
making base image configurable
 * Allow for overriding the start commands for things like using jupyter docker 
images.
 * add in python pip requirements, and I assume would be nice for R, is there 
something generic we can do to make this easy

Correct me if I'm wrong but anything spark related you should be able to use 
SPARK confs for, like env variables. like 
{{spark.kubernetes.driverEnv.[EnvironmentVariableName]}} and 
spark.executorEnv..  Otherwise you could just use the dockerfile built here as 
a base and build on it. 

I think we would just want to try to make it easy for the common cases and 
allow users to override things we may have hardcoded to allow them to reuse it 
as a base.

[~mcheah] From the original description, why do we want to try to not rebuild 
the image if spark version changes? It seems ok to allow them to override to 
point to their own spark version (which they could then use to do this), but I 
would think normally you would build a new docker image for a new version of 
spark? Dependencies may have changed, the docker template may have changed, 
etc..  It seems if they really wanted this, they would just specify their own 
docker image as a base and just add the spark pieces, is that what you are 
getting at?  We can make the base image a argument to the docker-image-tool.sh 
script

> [K8S] Custom Docker Image Expectations and Documentation
> 
>
> Key: SPARK-24655
> URL: https://issues.apache.org/jira/browse/SPARK-24655
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.1
>Reporter: Matt Cheah
>Priority: Major
>
> A common use case we want to support with Kubernetes is the usage of custom 
> Docker images. Some examples include:
>  * A user builds an application using Gradle or Maven, using Spark as a 
> compile-time dependency. The application's jars (both the custom-written jars 
> and the dependencies) need to be packaged in a docker image that can be run 
> via spark-submit.
>  * A user builds a PySpark or R application and desires to include custom 
> dependencies
>  * A user wants to switch the base image from Alpine to CentOS while using 
> either built-in or custom jars
> We currently do not document how these custom Docker images are supposed to 
> be built, nor do we guarantee stability of these Docker images with various 
> spark-submit versions. To illustrate how this can break down, suppose for 
> example we decide to change the names of environment variables that denote 
> the driver/executor extra JVM options specified by 
> {{spark.[driver|executor].extraJavaOptions}}. If we change the environment 
> variable spark-submit provides then the user must update their custom 
> Dockerfile and build new images.
> Rather than jumping to an implementation immediately though, it's worth 
> taking a step back and considering these matters from the perspective of the 
> end user. Towards that end, this ticket will serve as a forum where we can 
> answer at least the following questions, and any others pertaining to the 
> matter:
>  # What would be the steps a user would need to take to build a custom Docker 
> image, given their desire to customize the dependencies and the content (OS 
> or otherwise) of said images?
>  # How can we ensure the user does not need to rebuild the image if only the 
> spark-submit version changes?
> The end deliverable for this ticket is a design document, and then we'll 
> create sub-issues for the technical implementation and documentation of the 
> contract.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27495) Support Stage level resource scheduling

2019-04-17 Thread Thomas Graves (JIRA)
Thomas Graves created SPARK-27495:
-

 Summary: Support Stage level resource scheduling
 Key: SPARK-27495
 URL: https://issues.apache.org/jira/browse/SPARK-27495
 Project: Spark
  Issue Type: Story
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Thomas Graves


Currently Spark supports CPU level scheduling and we are adding in accelerator 
aware scheduling with https://issues.apache.org/jira/browse/SPARK-24615, but 
both of those are scheduling via application level configurations.  Meaning 
there is one configuration that is set for the entire lifetime of the 
application and the user can't change it between Spark jobs/stages within that 
application.  

Many times users have different requirements for different stages of their 
application so they want to be able to configure at the stage level what 
resources are required for that stage.

For example, I might start a spark application which first does some ETL work 
that needs lots of cores to run many tasks in parallel, then once that is done 
I want to run some ML job and at that point I want GPU's, less CPU's, and more 
memory.

With this Jira we want to add the ability for users to specify the resources 
for different stages.

Note that https://issues.apache.org/jira/browse/SPARK-24615 had some 
discussions on this but this part of it was removed from that.

We should come up with a proposal on how to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27495) Support Stage level resource configuration and scheduling

2019-04-17 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-27495:
--
Summary: Support Stage level resource configuration and scheduling  (was: 
Support Stage level resource scheduling)

> Support Stage level resource configuration and scheduling
> -
>
> Key: SPARK-27495
> URL: https://issues.apache.org/jira/browse/SPARK-27495
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Major
>
> Currently Spark supports CPU level scheduling and we are adding in 
> accelerator aware scheduling with 
> https://issues.apache.org/jira/browse/SPARK-24615, but both of those are 
> scheduling via application level configurations.  Meaning there is one 
> configuration that is set for the entire lifetime of the application and the 
> user can't change it between Spark jobs/stages within that application.  
> Many times users have different requirements for different stages of their 
> application so they want to be able to configure at the stage level what 
> resources are required for that stage.
> For example, I might start a spark application which first does some ETL work 
> that needs lots of cores to run many tasks in parallel, then once that is 
> done I want to run some ML job and at that point I want GPU's, less CPU's, 
> and more memory.
> With this Jira we want to add the ability for users to specify the resources 
> for different stages.
> Note that https://issues.apache.org/jira/browse/SPARK-24615 had some 
> discussions on this but this part of it was removed from that.
> We should come up with a proposal on how to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27492) High level user documentation

2019-04-17 Thread Thomas Graves (JIRA)
Thomas Graves created SPARK-27492:
-

 Summary: High level user documentation
 Key: SPARK-27492
 URL: https://issues.apache.org/jira/browse/SPARK-27492
 Project: Spark
  Issue Type: Story
  Components: Documentation
Affects Versions: 3.0.0
Reporter: Thomas Graves


Add some high level user documentation about how this feature works together 
and point to things like the example discovery script, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27489) UI updates to show executor resource information

2019-04-17 Thread Thomas Graves (JIRA)
Thomas Graves created SPARK-27489:
-

 Summary: UI updates to show executor resource information
 Key: SPARK-27489
 URL: https://issues.apache.org/jira/browse/SPARK-27489
 Project: Spark
  Issue Type: Story
  Components: Web UI
Affects Versions: 3.0.0
Reporter: Thomas Graves
Assignee: Thomas Graves


We are adding other resource type support to the executors and Spark. We should 
show the resource information for each executor on the UI Executors page.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-17 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reopened SPARK-27364:
---

reopening since it has a subtask

> User-facing APIs for GPU-aware scheduling
> -
>
> Key: SPARK-27364
> URL: https://issues.apache.org/jira/browse/SPARK-27364
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Thomas Graves
>Priority: Major
>
> Design and implement:
> * General guidelines for cluster managers to understand resource requests at 
> application start. The concrete conf/param will be under the design of each 
> cluster manager.
> * APIs to fetch assigned resources from task context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-17 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16820120#comment-16820120
 ] 

Thomas Graves commented on SPARK-27364:
---

based on no comments on this I'm going to resolve this and we can discuss more 
in the prs for implementation.

> User-facing APIs for GPU-aware scheduling
> -
>
> Key: SPARK-27364
> URL: https://issues.apache.org/jira/browse/SPARK-27364
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Thomas Graves
>Priority: Major
>
> Design and implement:
> * General guidelines for cluster managers to understand resource requests at 
> application start. The concrete conf/param will be under the design of each 
> cluster manager.
> * APIs to fetch assigned resources from task context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-17 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-27364.
---
Resolution: Fixed

> User-facing APIs for GPU-aware scheduling
> -
>
> Key: SPARK-27364
> URL: https://issues.apache.org/jira/browse/SPARK-27364
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Thomas Graves
>Priority: Major
>
> Design and implement:
> * General guidelines for cluster managers to understand resource requests at 
> application start. The concrete conf/param will be under the design of each 
> cluster manager.
> * APIs to fetch assigned resources from task context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27488) Driver interface to support GPU resources

2019-04-17 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16820119#comment-16820119
 ] 

Thomas Graves commented on SPARK-27488:
---

Note, the api design is here: https://issues.apache.org/jira/browse/SPARK-27364

> Driver interface to support GPU resources 
> --
>
> Key: SPARK-27488
> URL: https://issues.apache.org/jira/browse/SPARK-27488
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Assignee: Thomas Graves
>Priority: Major
>
> We want to have an interface to allow the users on the driver to get what 
> resources are allocated to them.  This is mostly to handle the case the 
> cluster manager does not launch the driver in an isolated environment and 
> where users could be sharing hosts.  For instance in standalone mode it 
> doesn't support container isolation so a host may have 4 gpu's, but only 2 of 
> them could be assigned to the driver.  In this case we need an interface for 
> the cluster manager to specify what gpu's for the driver to use and an 
> interface for the user to get the resource information



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27488) Driver interface to support GPU resources

2019-04-17 Thread Thomas Graves (JIRA)
Thomas Graves created SPARK-27488:
-

 Summary: Driver interface to support GPU resources 
 Key: SPARK-27488
 URL: https://issues.apache.org/jira/browse/SPARK-27488
 Project: Spark
  Issue Type: Story
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Thomas Graves
Assignee: Thomas Graves


We want to have an interface to allow the users on the driver to get what 
resources are allocated to them.  This is mostly to handle the case the cluster 
manager does not launch the driver in an isolated environment and where users 
could be sharing hosts.  For instance in standalone mode it doesn't support 
container isolation so a host may have 4 gpu's, but only 2 of them could be 
assigned to the driver.  In this case we need an interface for the cluster 
manager to specify what gpu's for the driver to use and an interface for the 
user to get the resource information



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-16 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819153#comment-16819153
 ] 

Thomas Graves commented on SPARK-27396:
---

Since I don't hear any strong objections against the idea, I'm going to put the 
SPIP up for vote on the mailing list. We can continue discussions here or on 
the list.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking and 
> just casts the InternalRow to the desired ColumnarBatch.  This makes it 
> difficult for others to implement the same functionality for different 
> processing because they can only do it through code generation. There really 
> is no clean separate path in the code generation for columnar vs row based. 
> Additionally because it is only supported through code generation if for any 
> reason code generation would fail there is no backup.  This is typically fine 
> for input formats but can be problematic when we get into more extensive 
> processing. 
>  # When caching data it can optionally be cached in a columnar 

[jira] [Commented] (SPARK-25250) Race condition with tasks running when new attempt for same stage is created leads to other task in the next attempt running on the same partition id retry multiple ti

2019-04-15 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818053#comment-16818053
 ] 

Thomas Graves commented on SPARK-25250:
---

[~cloud_fan] can you please add details as to where and why this was reverted?

 

This went into multiple branches 2.4.1 has already been released so I don't 
necessarily agree with a revert here.  I would prefer to see another bug since 
its been released already.

> Race condition with tasks running when new attempt for same stage is created 
> leads to other task in the next attempt running on the same partition id 
> retry multiple times
> --
>
> Key: SPARK-25250
> URL: https://issues.apache.org/jira/browse/SPARK-25250
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.3.1
>Reporter: Parth Gandhi
>Assignee: Parth Gandhi
>Priority: Major
> Fix For: 2.3.4, 2.4.1, 3.0.0
>
>
> We recently had a scenario where a race condition occurred when a task from 
> previous stage attempt just finished before new attempt for the same stage 
> was created due to fetch failure, so the new task created in the second 
> attempt on the same partition id was retrying multiple times due to 
> TaskCommitDenied Exception without realizing that the task in earlier attempt 
> was already successful.  
> For example, consider a task with partition id 9000 and index 9000 running in 
> stage 4.0. We see a fetch failure so thus, we spawn a new stage attempt 4.1. 
> Just within this timespan, the above task completes successfully, thus, 
> marking the partition id 9000 as complete for 4.0. However, as stage 4.1 has 
> not yet been created, the taskset info for that stage is not available to the 
> TaskScheduler so, naturally, the partition id 9000 has not been marked 
> completed for 4.1. Stage 4.1 now spawns task with index 2000 on the same 
> partition id 9000. This task fails due to CommitDeniedException and since, it 
> does not see the corresponding partition id as been marked successful, it 
> keeps retrying multiple times until the job finally succeeds. It doesn't 
> cause any job failures because the DAG scheduler is tracking the partitions 
> separate from the task set managers.
>  
> Steps to Reproduce:
>  # Run any large job involving shuffle operation.
>  # When the ShuffleMap stage finishes and the ResultStage begins running, 
> cause this stage to throw a fetch failure exception(Try deleting certain 
> shuffle files on any host).
>  # Observe the task attempt numbers for the next stage attempt. Please note 
> that this issue is an intermittent one, so it might not happen all the time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-11 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815465#comment-16815465
 ] 

Thomas Graves edited comment on SPARK-27364 at 4/11/19 2:37 PM:


So there is actually another one we need for standalone mode with the Driver.  
Since there might not be isolation the driver needs to be able to be told what 
gpu's to use.  This means that we will should have an API for the user to get 
the GPU's on the driver just like they do with TaskContext.  We also need a way 
on launching the driver to specify the gpu's. 

 

4)  Driver discovers or is told which gpus:

  a) Similar to the executor we have a config for it to specify a script:  
*spark.driver.resource.gpu.discoveryScript*     to allow it to discover the 
gpus. 

  b) We also need a parameter on startup for Standalone mode to specify them, 
instead of having a parameter like the executors, since there is only one 
driver and it can be launched in many different ways between the different 
resource manager and cluster/client modes, it would be easier to have a config 
to specify the gpu indices it should use.  The reason not to use a config on 
the executors is you could have executors on different hosts each of which 
would have different indices and so having one common config there doesn't make 
sense. So I propose a config to do this: *spark.driver.resource.gpu.addresses.* 
 Note the config will take precendence over the discoveryScript so if its set 
hte discovery script isn't run

5) For the user facing api for the user on the driver to see the gpu resources, 
we will add a function to *SparkContext* similar to the TaskContext version:

*{color:#80}def {color}getResources(): 
{color:#20999d}Map{color}[{color:#20999d}String{color}, ResourceInformation]*


was (Author: tgraves):
So there is actually another one we need for standalone mode with the Driver.  
Since there might not be isolation the driver needs to be able to be told what 
gpu's to use.  This means that we will should have an API for the user to get 
the GPU's on the driver just like they do with TaskContext.  We also need a way 
on launching the driver to specify the gpu's. 

 

4)  Driver discovers or is told which gpus:

  a) Similar to the executor we have a config for it to specify a script:  
*spark.driver.resource.gpu.discoveryScript*     to allow it to discover the 
gpus. 

  b) We also need a parameter on startup for Standalone mode to specify them, 
instead of having a parameter like the executors, since there is only one 
driver and it can be launched in many different ways between the different 
resource manager and cluster/client modes, it would be easier to have a config 
to specify the gpu indices it should use.  The reason not to use a config on 
the executors is you could have executors on different hosts each of which 
would have different indices and so having one common config there doesn't make 
sense. So I propose a config to do this: *spark.driver.resource.gpu.addresses* 

5) For the user facing api for the user on the driver to see the gpu resources, 
we will add a function to *SparkContext* similar to the TaskContext version:

*{color:#80}def {color}getResources(): 
{color:#20999d}Map{color}[{color:#20999d}String{color}, ResourceInformation]*

> User-facing APIs for GPU-aware scheduling
> -
>
> Key: SPARK-27364
> URL: https://issues.apache.org/jira/browse/SPARK-27364
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Thomas Graves
>Priority: Major
>
> Design and implement:
> * General guidelines for cluster managers to understand resource requests at 
> application start. The concrete conf/param will be under the design of each 
> cluster manager.
> * APIs to fetch assigned resources from task context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-11 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815465#comment-16815465
 ] 

Thomas Graves commented on SPARK-27364:
---

So there is actually another one we need for standalone mode with the Driver.  
Since there might not be isolation the driver needs to be able to be told what 
gpu's to use.  This means that we will should have an API for the user to get 
the GPU's on the driver just like they do with TaskContext.  We also need a way 
on launching the driver to specify the gpu's. 

 

4)  Driver discovers or is told which gpus:

  a) Similar to the executor we have a config for it to specify a script:  
*spark.driver.resource.gpu.discoveryScript*     to allow it to discover the 
gpus. 

  b) We also need a parameter on startup for Standalone mode to specify them, 
instead of having a parameter like the executors, since there is only one 
driver and it can be launched in many different ways between the different 
resource manager and cluster/client modes, it would be easier to have a config 
to specify the gpu indices it should use.  The reason not to use a config on 
the executors is you could have executors on different hosts each of which 
would have different indices and so having one common config there doesn't make 
sense. So I propose a config to do this: *spark.driver.resource.gpu.addresses* 

5) For the user facing api for the user on the driver to see the gpu resources, 
we will add a function to *SparkContext* similar to the TaskContext version:

*{color:#80}def {color}getResources(): 
{color:#20999d}Map{color}[{color:#20999d}String{color}, ResourceInformation]*

> User-facing APIs for GPU-aware scheduling
> -
>
> Key: SPARK-27364
> URL: https://issues.apache.org/jira/browse/SPARK-27364
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Thomas Graves
>Priority: Major
>
> Design and implement:
> * General guidelines for cluster managers to understand resource requests at 
> application start. The concrete conf/param will be under the design of each 
> cluster manager.
> * APIs to fetch assigned resources from task context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-11 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812488#comment-16812488
 ] 

Thomas Graves edited comment on SPARK-27364 at 4/11/19 1:48 PM:


There are 3 main user facing impacts for the user for this are the taskContext 
interface to fetch the resources, the user api to specify the gpu count, and 
then how the executor discovers the gpu's or is told the gpus. Below is more 
detail:

 

1) How the user gets the resources from the TaskContext and BarrierTaskContext

  For the taskContext interface I propose we add an api like:

*{color:#80}def {color}getResources(): 
{color:#20999d}Map{color}[{color:#20999d}String{color}, ResourceInformation]*

Where the Map key is the resource type.  So examples would be "gpu", "fpga", 
etc.  "gpu" would be the only one we officially support to start with.

ResourceInformation would be a class with a name, units, count, and addresses.  
The name would be "gpu", the units for gpu would be empty "", but for other 
resources types like memory it could be GiB or similar, the count is the number 
of them, so for gpu's it would be the number allocated, and finally the address 
Array of strings could be whatever we want, in the gpu case it would just be 
the indexes of the gpu's allocated to the task, ie ["0", "2", "3"]. I made this 
a string so its very flexible as to what the address is based on different 
resources types.  Now the user has to know how to inpret this, but depending on 
what you are doing with them even the same tools have multiple ways to specify. 
For instance with tensorflow{{ you can specify in CUDA_VISIBLE_DEVICES=2,3 or 
you can speicify like:
 for d in ['/device:GPU:2', '/device:GPU:3']:
 }}

*{color:#80}private val {color}name: {color:#20999d}String{color},*
 *{color:#80}private val {color}units: {color:#20999d}String{color},*
 *{color:#80}private val {color}count: Long,*
 *{color:#80}private val {color}addresses: 
Array[{color:#20999d}String{color}] = Array.empty*

*{color:#80}def {color}getName(): {color:#20999d}String {color}= name*
 *{color:#80}def {color}getUnits(): {color:#20999d}String {color}= units*
 *{color:#80}def {color}getCount(): Long = count*
 *{color:#80}def {color}getAddresses(): Array[{color:#20999d}String{color}] 
= addresses*

2) How the user specifies the gpu resources upon application submission

Here we need multiple configs:

   a) one for the user to specify the gpus per task, that config, to make it 
extensible for other resources, I propose: *spark.task.resource.\{resource 
type}.count* .  This implementation would only support gpu but it gives us 
flexibility to add more. This allows for multiple resources as well as multiple 
configs for that resource. For instance resource type here would be gpu, but 
you could add fpga.  It also would allow you to add more configs instead of 
count.  You could add in like type for I want a certain gpu type for instance.

   b) User has to specify how many gpu's per executor and driver.  This one is 
a bit more complicated since it has to work with the resource managers to 
actually acquire those but I think it makes sense to have common configs like 
we do for cores and memory. So we can have *spark.executor.resource.\{resource 
type}.count* and *spark.driver.resource.\{resource type}.count*.   This 
implementation would only support gpu.  The tricky thing here is some of the 
resource managers already have configs for asking for gpu's.  Yarn has 
{{spark.yarn.executor.resource.

{resource-type}

}} although it was added in 3.0 and hasn't shipped yet, but we can't just 
remove it since you could ask yarn for other resource types spark doesn't know 
about.  Kubernetes you have to request via the pod template so I think it would 
be on the user to make sure those match. mesos has {{spark.mesos.gpus.max}}.  
So we just need to make sure the new configs maps into those and having the 
duplicate configs might make it a bit weird to the user.

3) how the executor discovers or is told the gpu resources it has.

Here I think we have 2 options for the user/resource manager.  

  a) I propose we add a config *spark.executor.resource.gpu.discoveryScript* to 
allow the user to specify a discovery script. This script gets run when the 
executor starts and the user requested gpus to discover what Gpu's the executor 
has.   A simple example of this would be the script simply runs "nvidia-smi 
--query-gpu=index --format=csv,noheader'" to get the gpu indexes for nvidia 
cards.  You could make this script super simple or complicated depending on 
your setup.  The API for the script is that its callable with no parameters and 
then the scripts returns a string of comma separated values.  Normally I would 
expected indexes like "0,1,2,3".

  b) Also add an option to the executor launch *--gpuDevices* that allows 

[jira] [Commented] (SPARK-27176) Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4

2019-04-11 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815414#comment-16815414
 ] 

Thomas Graves commented on SPARK-27176:
---

looks like I see one: https://github.com/apache/spark/pull/24346

> Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4
> 
>
> Key: SPARK-27176
> URL: https://issues.apache.org/jira/browse/SPARK-27176
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build, SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27176) Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4

2019-04-11 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815408#comment-16815408
 ] 

Thomas Graves commented on SPARK-27176:
---

It looks like the hadoop-3.2 profile no longer works, do we have a Jira to fix 
this?

 

Looks like a parquet version issue:

[error]/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala:127:
 value JOB_SUMMARY_LEVEL is not a member of object 
org.apache.parquet.hadoop.ParquetOutputFormat
[error] ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",

> Upgrade hadoop-3's built-in Hive maven dependencies to 2.3.4
> 
>
> Key: SPARK-27176
> URL: https://issues.apache.org/jira/browse/SPARK-27176
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build, SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-11 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815403#comment-16815403
 ] 

Thomas Graves commented on SPARK-27396:
---

I can shephard it.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking and 
> just casts the InternalRow to the desired ColumnarBatch.  This makes it 
> difficult for others to implement the same functionality for different 
> processing because they can only do it through code generation. There really 
> is no clean separate path in the code generation for columnar vs row based. 
> Additionally because it is only supported through code generation if for any 
> reason code generation would fail there is no backup.  This is typically fine 
> for input formats but can be problematic when we get into more extensive 
> processing. 
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the cache acts as an input, but it

[jira] [Assigned] (SPARK-27361) YARN support for GPU-aware scheduling

2019-04-09 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-27361:
-

Assignee: Thomas Graves

> YARN support for GPU-aware scheduling
> -
>
> Key: SPARK-27361
> URL: https://issues.apache.org/jira/browse/SPARK-27361
> Project: Spark
>  Issue Type: Story
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Thomas Graves
>Priority: Major
>
> Design and implement YARN support for GPU-aware scheduling:
> * User can request GPU resources at Spark application level.
> * YARN can pass GPU info to Spark executor.
> * Integrate with YARN 3.2 GPU support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-08 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812488#comment-16812488
 ] 

Thomas Graves edited comment on SPARK-27364 at 4/8/19 3:10 PM:
---

There are 3 main user facing impacts for the user for this are the taskContext 
interface to fetch the resources, the user api to specify the gpu count, and 
then how the executor discovers the gpu's or is told the gpus. Below is more 
detail:

 

1) How the user gets the resources from the TaskContext and BarrierTaskContext

  For the taskContext interface I propose we add an api like:

*{color:#80}def {color}getResources(): 
{color:#20999d}Map{color}[{color:#20999d}String{color}, ResourceInformation]*

Where the Map key is the resource type.  So examples would be "gpu", "fpga", 
etc.  "gpu" would be the only one we officially support to start with.

ResourceInformation would be a class with a name, units, count, and addresses.  
The name would be "gpu", the units for gpu would be empty "", but for other 
resources types like memory it could be GiB or similar, the count is the number 
of them, so for gpu's it would be the number allocated, and finally the address 
Array of strings could be whatever we want, in the gpu case it would just be 
the indexes of the gpu's allocated to the task, ie ["0", "2", "3"]. I made this 
a string so its very flexible as to what the address is based on different 
resources types.  Now the user has to know how to inpret this, but depending on 
what you are doing with them even the same tools have multiple ways to specify. 
For instance with tensorflow{{ you can specify in CUDA_VISIBLE_DEVICES=2,3 or 
you can speicify like:
 for d in ['/device:GPU:2', '/device:GPU:3']:
 }}

*{color:#80}private val {color}name: {color:#20999d}String{color},*
 *{color:#80}private val {color}units: {color:#20999d}String{color},*
 *{color:#80}private val {color}count: Long,*
 *{color:#80}private val {color}addresses: 
Array[{color:#20999d}String{color}] = Array.empty*

*{color:#80}def {color}getName(): {color:#20999d}String {color}= name*
 *{color:#80}def {color}getUnits(): {color:#20999d}String {color}= units*
 *{color:#80}def {color}getCount(): Long = count*
 *{color:#80}def {color}getAddresses(): Array[{color:#20999d}String{color}] 
= addresses*

2) How the user specifies the gpu resources upon application submission

Here we need multiple configs:

   a) one for the user to specify the gpus per task, that config, to make it 
extensible for other resources, I propose: *spark.task.resource.\{resource 
type}.count* .  This implementation would only support gpu but it gives us 
flexibility to add more. This allows for multiple resources as well as multiple 
configs for that resource. For instance resource type here would be gpu, but 
you could add fpga.  It also would allow you to add more configs instead of 
count.  You could add in like type for I want a certain gpu type for instance.

   b) User has to specify how many gpu's per executor and driver.  This one is 
a bit more complicated since it has to work with the resource managers to 
actually acquire those but I think it makes sense to have common configs like 
we do for cores and memory. So we can have *spark.executor.resource.\{resource 
type}.count* and *spark.driver.resource.\{resource type}.count*.   This 
implementation would only support gpu.  The tricky thing here is some of the 
resource managers already have configs for asking for gpu's.  Yarn has 
{{spark.yarn.executor.resource.

{resource-type}

}} although it was added in 3.0 and hasn't shipped yet, but we can't just 
remove it since you could ask yarn for other resource types spark doesn't know 
about.  Kubernetes you have to request via the pod template so I think it would 
be on the user to make sure those match. mesos has {{spark.mesos.gpus.max}}.  
So we just need to make sure the new configs maps into those and having the 
duplicate configs might make it a bit weird to the user.

3) how the executor discovers or is told the gpu resources it has.

Here I think we have 2 options for the user/resource manager.  

  a) I propose we add a config *spark.\{executor, 
driver}.resource.gpu.discoverScript* to allow the user to specify a discovery 
script. This script gets run when the executor starts and the user requested 
gpus to discover what Gpu's the executor has.   A simple example of this would 
be the script simply runs "nvidia-smi --query-gpu=index --format=csv,noheader'" 
to get the gpu indexes for nvidia cards.  You could make this script super 
simple or complicated depending on your setup.  The API for the script is that 
its callable with no parameters and then the scripts returns a string of comma 
separated values.  Normally I would expected indexes like "0,1,2,3".

  b) Also add an option to the executor launch *--gpuDevices* that

[jira] [Comment Edited] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-08 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812488#comment-16812488
 ] 

Thomas Graves edited comment on SPARK-27364 at 4/8/19 3:01 PM:
---

There are 3 main user facing impacts for the user for this are the taskContext 
interface to fetch the resources, the user api to specify the gpu count, and 
then how the executor discovers the gpu's or is told the gpus. Below is more 
detail:

 

1) How the user gets the resources from the TaskContext and BarrierTaskContext

  For the taskContext interface I propose we add an api like:

*{color:#80}def {color}getResources(): 
{color:#20999d}Map{color}[{color:#20999d}String{color}, ResourceInformation]*

Where the Map key is the resource type.  So examples would be "gpu", "fpga", 
etc.  "gpu" would be the only one we officially support to start with.

ResourceInformation would be a class with a name, units, count, and addresses.  
The name would be "gpu", the units for gpu would be empty "", but for other 
resources types like memory it could be GiB or similar, the count is the number 
of them, so for gpu's it would be the number allocated, and finally the address 
Array of strings could be whatever we want, in the gpu case it would just be 
the indexes of the gpu's allocated to the task, ie ["0", "2", "3"]. I made this 
a string so its very flexible as to what the address is based on different 
resources types.  Now the user has to know how to inpret this, but depending on 
what you are doing with them even the same tools have multiple ways to specify. 
For instance with tensorflow{{ you can specify in CUDA_VISIBLE_DEVICES=2,3 or 
you can speicify like:
 for d in ['/device:GPU:2', '/device:GPU:3']:
 }}

*{color:#80}private val {color}name: {color:#20999d}String{color},*
 *{color:#80}private val {color}units: {color:#20999d}String{color},*
 *{color:#80}private val {color}count: Long,*
 *{color:#80}private val {color}addresses: 
Array[{color:#20999d}String{color}] = Array.empty*

*{color:#80}def {color}getName(): {color:#20999d}String {color}= name*
 *{color:#80}def {color}getUnits(): {color:#20999d}String {color}= units*
 *{color:#80}def {color}getCount(): Long = count*
 *{color:#80}def {color}getAddresses(): Array[{color:#20999d}String{color}] 
= addresses*

2) How the user specifies the gpu resources upon application submission

Here we need multiple configs:

   a) one for the user to specify the gpus per task, that config, to make it 
extensible for other resources, I propose: *spark.task.resource.\{resource 
type}.count* .  This implementation would only support gpu but it gives us 
flexibility to add more. This allows for multiple resources as well as multiple 
configs for that resource. For instance resource type here would be gpu, but 
you could add fpga.  It also would allow you to add more configs instead of 
count.  You could add in like type for I want a certain gpu type for instance.

   b) User has to specify how many gpu's per executor and driver.  This one is 
a bit more complicated since it has to work with the resource managers to 
actually acquire those but I think it makes sense to have common configs like 
we do for cores and memory. So we can have *spark.executor.resource.\{resource 
type}.count* and *spark.driver.resource.\{resource type}.count*.   This 
implementation would only support gpu.  The tricky thing here is some of the 
resource managers already have configs for asking for gpu's.  Yarn has 
{{spark.yarn.executor.resource.{resource-type}}} although it was added in 3.0 
and hasn't shipped yet, but we can't just remove it since you could ask yarn 
for other resource types spark doesn't know about.  Kubernetes you have to 
request via the pod template so I think it would be on the user to make sure 
those match. mesos has {{spark.mesos.gpus.max}}.  So we just need to make sure 
the new configs maps into those and having the duplicate configs might make it 
a bit weird to the user.

3) how the executor discovers or is told the gpu resources it has.

Here I think we have 2 options for the user/resource manager.  

  a) I propose we add a config *spark.\{executor, 
driver}.resource.gpu.discoverScript* to allow the user to specify a discovery 
script. This script gets run when the executor starts and the user requested 
gpus to discover what Gpu's the executor has.   A simple example of this would 
be the script simply runs "nvidia-smi --query-gpu=index --format=csv,noheader'" 
to get the gpu indexes for nvidia cards.  You could make this script super 
simple or complicated depending on your setup.

  b) Also add an option to the executor launch *--gpuDevices* that allows the 
resource manager to specify the indexes of the gpu devices it has.   This 
allows insecure or non-containerized resource managers like standalone mode to 
allocate gpu's pe

[jira] [Commented] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-08 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812488#comment-16812488
 ] 

Thomas Graves commented on SPARK-27364:
---

There are 3 main user facing impacts for the user for this are the taskContext 
interface to fetch the resources, the user api to specify the gpu count, and 
then how the executor discovers the gpu's or is told the gpus. Below is more 
detail:

 

1) How the user gets the resources from the TaskContext and BarrierTaskContext

  For the taskContext interface I propose we add an api like:

{color:#80}def {color}getResources(): 
{color:#20999d}Map{color}[{color:#20999d}String{color}, ResourceInformation]

Where the Map key is the resource type.  So examples would be "gpu", "fpga", 
etc.  "gpu" would be the only one we officially support to start with.

ResourceInformation would be a class with a name, units, count, and addresses.  
The name would be "gpu", the units for gpu would be empty "", but for other 
resources types like memory it could be GiB or similar, the count is the number 
of them, so for gpu's it would be the number allocated, and finally the address 
Array of strings could be whatever we want, in the gpu case it would just be 
the indexes of the gpu's allocated to the task, ie ["0", "2", "3"]. I made this 
a string so its very flexible as to what the address is based on different 
resources types.  Now the user has to know how to inpret this, but depending on 
what you are doing with them even the same tools have multiple ways to specify. 
For instance with tensorflow{{ you can specify in CUDA_VISIBLE_DEVICES=2,3 or 
you can speicify like:
for d in ['/device:GPU:2', '/device:GPU:3']:
}}

{color:#80}private val {color}name: {color:#20999d}String{color},
{color:#80}private val {color}units: {color:#20999d}String{color},
{color:#80}private val {color}count: Long,
{color:#80}private val {color}addresses: 
Array[{color:#20999d}String{color}] = Array.empty

{color:#80}def {color}getName(): {color:#20999d}String {color}= name
 {color:#80}def {color}getUnits(): {color:#20999d}String {color}= units
 {color:#80}def {color}getCount(): Long = count
 {color:#80}def {color}getAddresses(): Array[{color:#20999d}String{color}] 
= addresses



2) How the user specifies the gpu resources upon application submission

Here we need multiple configs:

   a) one for the user to specify the gpus per task, that config, to make it 
extensible for other resources, I propose: *spark.task.resource.\{resource 
type}.count* .  This implementation would only support gpu but it gives us 
flexibility to add more. This allows for multiple resources as well as multiple 
configs for that resource. For instance resource type here would be gpu, but 
you could add fpga.  It also would allow you to add more configs instead of 
count.  You could add in like type for I want a certain gpu type for instance.

   b) User has to specify how many gpu's per executor and driver.  This one is 
a bit more complicated since it has to work with the resource managers to 
actually acquire those but I think it makes sense to have common configs like 
we do for cores and memory. So we can have *spark.executor.resource.\{resource 
type}.count* and *spark.driver.resource.\{resource type}.count*.   This 
implementation would only support gpu.  The tricky thing here is some of the 
resource managers already have configs for asking for gpu's.  Yarn has 
{{spark.yarn.executor.resource.\{resource-type}}} although it was added in 3.0 
and hasn't shipped yet, but we can't just remove it since you could ask yarn 
for other resource types spark doesn't know about.  Kubernetes you have to 
request via the pod template so I think it would be on the user to make sure 
those match. mesos has {{spark.mesos.gpus.max}}.  So we just need to make sure 
the new configs maps into those and having the duplicate configs might make it 
a bit weird to the user.

3) how the executor discovers or is told the gpu resources it has.

Here I think we have 2 options for the user/resource manager.  

  a) I propose we add a config *spark.\{executor, 
driver}.resource.gpu.discoverScript* to allow the user to specify a discovery 
script. This script gets run when the executor starts and the user requested 
gpus to discover what Gpu's the executor has.   A simple example of this would 
be the script simply runs "nvidia-smi --query-gpu=index --format=csv,noheader'" 
to get the gpu indexes for nvidia cards.  You could make this script super 
simple or complicated depending on your setup.

  b) Also add an option to the executor launch *--gpuDevices* that allows the 
resource manager to specify the indexes of the gpu devices it has.   This 
allows insecure or non-containerized resource managers like standalone mode to 
allocate gpu's per executor without having containers and isolation all 
implemented

[jira] [Commented] (SPARK-27364) User-facing APIs for GPU-aware scheduling

2019-04-04 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809893#comment-16809893
 ] 

Thomas Graves commented on SPARK-27364:
---

working on this, will post a basic design when I have one.

> User-facing APIs for GPU-aware scheduling
> -
>
> Key: SPARK-27364
> URL: https://issues.apache.org/jira/browse/SPARK-27364
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Thomas Graves
>Priority: Major
>
> Design and implement:
> * General guidelines for cluster managers to understand resource requests at 
> application start. The concrete conf/param will be under the design of each 
> cluster manager.
> * APIs to fetch assigned resources from task context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27024) Executor interface for cluster managers to support GPU resources

2019-04-04 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-27024:
--
Description: 
The executor interface shall deal with the resources allocated to the executor 
by cluster managers(Standalone, YARN, Kubernetes).  The Executor either needs 
to be told the resources it was given or it needs to discover them in order for 
the executor to sync with the driver to expose available resources to support 
task scheduling.

Note this is part of a bigger feature for gpu-aware scheduling and is just how 
the executor find the resources. The general flow :
 * users ask for a certain set of resources, for instance number of gpus - each 
cluster manager has a specific way to do this.
 * cluster manager allocates a container or set of resources (standalone mode)
 * When spark launches the executor in that container, the executor either has 
to be told what resources it has or it has to auto discover them.
 * Executor has to register with Driver and tell the driver the set of 
resources it has so the scheduler can use that to schedule tasks that requires 
a certain amount of each of those resources

  was:The executor interface shall deal with the resources allocated to the 
executor by cluster managers(Standalone, YARN, Kubernetes).  The Executor 
either needs to be told the resources it was given or it needs to discover them 
in order for the executor to sync with the driver to expose available resources 
to support task scheduling.


> Executor interface for cluster managers to support GPU resources
> 
>
> Key: SPARK-27024
> URL: https://issues.apache.org/jira/browse/SPARK-27024
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Assignee: Thomas Graves
>Priority: Major
>
> The executor interface shall deal with the resources allocated to the 
> executor by cluster managers(Standalone, YARN, Kubernetes).  The Executor 
> either needs to be told the resources it was given or it needs to discover 
> them in order for the executor to sync with the driver to expose available 
> resources to support task scheduling.
> Note this is part of a bigger feature for gpu-aware scheduling and is just 
> how the executor find the resources. The general flow :
>  * users ask for a certain set of resources, for instance number of gpus - 
> each cluster manager has a specific way to do this.
>  * cluster manager allocates a container or set of resources (standalone mode)
>  * When spark launches the executor in that container, the executor either 
> has to be told what resources it has or it has to auto discover them.
>  * Executor has to register with Driver and tell the driver the set of 
> resources it has so the scheduler can use that to schedule tasks that 
> requires a certain amount of each of those resources



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27024) Executor interface for cluster managers to support GPU resources

2019-04-04 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-27024:
--
Description: The executor interface shall deal with the resources allocated 
to the executor by cluster managers(Standalone, YARN, Kubernetes).  The 
Executor either needs to be told the resources it was given or it needs to 
discover them in order for the executor to sync with the driver to expose 
available resources to support task scheduling.  (was: The executor interface 
shall deal with the resources allocated to the executor by cluster 
managers(Standalone, YARN, Kubernetes), so the Spark Executor don’t need to 
involve into the GPU discovery and allocation, which shall be handled by 
cluster managers. However, an executor need to sync with the driver to expose 
available resources to support task scheduling.)

> Executor interface for cluster managers to support GPU resources
> 
>
> Key: SPARK-27024
> URL: https://issues.apache.org/jira/browse/SPARK-27024
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Assignee: Thomas Graves
>Priority: Major
>
> The executor interface shall deal with the resources allocated to the 
> executor by cluster managers(Standalone, YARN, Kubernetes).  The Executor 
> either needs to be told the resources it was given or it needs to discover 
> them in order for the executor to sync with the driver to expose available 
> resources to support task scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27005) Design sketch: Accelerator-aware scheduling

2019-03-05 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784555#comment-16784555
 ] 

Thomas Graves edited comment on SPARK-27005 at 3/5/19 3:40 PM:
---

so we have both a google design doc and the comment above, can you consolidate 
into 1 place?  the google doc might be easier to comment on.  I added comments 
to the google doc


was (Author: tgraves):
so we have both a google design doc and the comment above, can you consolidate 
into 1 place?  the google doc might be easier to comment on.

> Design sketch: Accelerator-aware scheduling
> ---
>
> Key: SPARK-27005
> URL: https://issues.apache.org/jira/browse/SPARK-27005
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Priority: Major
>
> This task is to outline a design sketch for the accelerator-aware scheduling 
> SPIP discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27005) Design sketch: Accelerator-aware scheduling

2019-03-05 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784555#comment-16784555
 ] 

Thomas Graves commented on SPARK-27005:
---

so we have both a google design doc and the comment above, can you consolidate 
into 1 place?  the google doc might be easier to comment on.

> Design sketch: Accelerator-aware scheduling
> ---
>
> Key: SPARK-27005
> URL: https://issues.apache.org/jira/browse/SPARK-27005
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Priority: Major
>
> This task is to outline a design sketch for the accelerator-aware scheduling 
> SPIP discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27024) Design executor interface to support GPU resources

2019-03-01 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782167#comment-16782167
 ] 

Thomas Graves commented on SPARK-27024:
---

This and SPARK-27005 basically split the design of the entire feature into 2.  
The intention is for SPARK-27005 to be the core scheduler pieces and this one 
is for the cluster manager and part of executor sides.

 

I will try to clarify the description when I get a chance to look at it a bit 
more, probably early next week.

> Design executor interface to support GPU resources
> --
>
> Key: SPARK-27024
> URL: https://issues.apache.org/jira/browse/SPARK-27024
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Priority: Major
>
> The executor interface shall deal with the resources allocated to the 
> executor by cluster managers(Standalone, YARN, Kubernetes), so the Spark 
> Executor don’t need to involve into the GPU discovery and allocation, which 
> shall be handled by cluster managers. However, an executor need to sync with 
> the driver to expose available resources to support task scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27024) Design executor interface to support GPU resources

2019-03-01 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16781734#comment-16781734
 ] 

Thomas Graves commented on SPARK-27024:
---

I will be looking at this and propose a design.

> Design executor interface to support GPU resources
> --
>
> Key: SPARK-27024
> URL: https://issues.apache.org/jira/browse/SPARK-27024
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Priority: Major
>
> The executor interface shall deal with the resources allocated to the 
> executor by cluster managers(Standalone, YARN, Kubernetes), so the Spark 
> Executor don’t need to involve into the GPU discovery and allocation, which 
> shall be handled by cluster managers. However, an executor need to sync with 
> the driver to expose available resources to support task scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27005) Design sketch: Accelerator-aware scheduling

2019-03-01 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16781728#comment-16781728
 ] 

Thomas Graves commented on SPARK-27005:
---

It seems like we are mixing gpu's as static resource vs generic one as 
accelerators.  Perhaps we should consider making it generic but then having 
kind of a #define for gpu's so it can be consistent for users but if they want 
other generic ones they work as well.

So for instance of calling it *spark.task.gpus* we could call it 
spark.task.accelerator.[resource]  to make that part generic and then we could 
define gpu as a known type so that its easier for users ot use the same name 
there.

I also assume in a few of the places you say search for gpu requirements you 
mean search for generic accelerator requirements - like in the *Expand 
RDD/Stage to support GPU* section?

If you are using a separate queue for the tasks with accelerator needs, does it 
still go through locality checks?  Can you expand upon exactly where the queue 
is you are proposing adding?   The scheduler goes through the resourceOffer 
code and down into resourceOfferSingleTaskSet for each locality level and then 
into the dequeueTask where it finally pulls from a queue, is this dequeueTask 
where you are proposing adding a new queue?

> Design sketch: Accelerator-aware scheduling
> ---
>
> Key: SPARK-27005
> URL: https://issues.apache.org/jira/browse/SPARK-27005
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Priority: Major
>
> This task is to outline a design sketch for the accelerator-aware scheduling 
> SPIP discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26792) Apply custom log URL to Spark UI

2019-01-30 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16756750#comment-16756750
 ] 

Thomas Graves commented on SPARK-26792:
---

don't see a problem with changing the default in 3.0, its a major release and 
we are allowed to change apis and such so this is no different.  If enough 
people think its a better user experience we should change the default.   It 
would be good to get feedback from more users.

> Apply custom log URL to Spark UI
> 
>
> Key: SPARK-26792
> URL: https://issues.apache.org/jira/browse/SPARK-26792
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Priority: Major
>
> SPARK-23155 enables SHS to set up custom log URLs for incompleted / completed 
> apps.
> While getting reviews from SPARK-23155, I've got two comments which applying 
> custom log URLs to UI would help achieving it. Quoting these comments here:
> https://github.com/apache/spark/pull/23260#issuecomment-456827963
> {quote}
> Sorry I haven't had time to look through all the code so this might be a 
> separate jira, but one thing I thought of here is it would be really nice not 
> to have specifically stderr/stdout. users can specify any log4j.properties 
> and some tools like oozie by default end up using hadoop log4j rather then 
> spark log4j, so files aren't necessarily the same. Also users can put in 
> other logs files so it would be nice to have links to those from the UI. It 
> seems simpler if we just had a link to the directory and it read the files 
> within there. Other things in Hadoop do it this way, but I'm not sure if that 
> works well for other resource managers, any thoughts on that? As long as this 
> doesn't prevent the above I can file a separate jira for it.
> {quote}
> https://github.com/apache/spark/pull/23260#issuecomment-456904716
> {quote}
> Hi Tom, +1: singling out stdout and stderr is definitely an annoyance. We
> typically configure Spark jobs to write the GC log and dump heap on OOM
> using ,  and/or we use the rolling file appender to deal with
> large logs during debugging. So linking the YARN container log overview
> page would make much more sense for us. We work it around with a custom
> submit process that logs all important URLs on the submit side log.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22229) SPIP: RDMA Accelerated Shuffle Engine

2019-01-25 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16752356#comment-16752356
 ] 

Thomas Graves commented on SPARK-9:
---

This is interesting, a few questions
 * I'm assuming all the data has to fit into memory for this to work?  Or is it 
somehow handling spill files by pulling them into memory and then transferring? 
 Does it fail if its not all in memory?
 * The benchmarks data size I saw seemed to all appear to fit into memory, is 
that right? 
 * Did you performance test with both rdma over ethernet and infiniband?
 * To clarify the above question, is the implementation in Mellanox/SparkRDMA 
github stable or not yet complete?
 * The SPIP mentions: MapStatuses are redundant – no need for those extra 
transfers that take precious seconds in many job. -> How does reducer know 
where to fetch map output from then?  It still somehow needs to know a host and 
perhaps memory location unless that host its fetching from just knows based on 
mapid and reduceid.  
 * I assume this is only supported with external shuffle disabled (which 
probably doesn't exist since you have different shuffle manager) and no dynamic 
allocation?
 * Depending on above questions, if its all in memory, I assume if executor 
goes down it has to rerun those tasks since its not on disk for external 
shuffle service to still serve up.
 * If someone was to try this out, from the spip: "SparkRDMA manages its own 
memory, off-heap", I take that to mean in addition to sparks normal memory 
usage you need to give the spark executor enough off heap memory to account for 
whatever size you are shuffling then?

> SPIP: RDMA Accelerated Shuffle Engine
> -
>
> Key: SPARK-9
> URL: https://issues.apache.org/jira/browse/SPARK-9
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.4.0, 3.0.0
>Reporter: Yuval Degani
>Priority: Major
> Attachments: 
> SPARK-9_SPIP_RDMA_Accelerated_Shuffle_Engine_Rev_1.0.pdf
>
>
> An RDMA-accelerated shuffle engine can provide enormous performance benefits 
> to shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
> open-source project ([https://github.com/Mellanox/SparkRDMA]).
> Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
> processing overhead by bypassing the kernel and networking stack as well as 
> avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
> directly by the actual Spark workloads, and help reducing the job runtime 
> significantly. 
> This performance gain is demonstrated with both industry standard HiBench 
> TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive 
> customer applications. 
> SparkRDMA will be presented at Spark Summit 2017 in Dublin 
> ([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/]).
> Please see attached proposal document for more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24615) Accelerator-aware task scheduling for Spark

2019-01-23 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750375#comment-16750375
 ] 

Thomas Graves commented on SPARK-24615:
---

[~jerryshao]  just curious where this is at, are you still working on it?

> Accelerator-aware task scheduling for Spark
> ---
>
> Key: SPARK-24615
> URL: https://issues.apache.org/jira/browse/SPARK-24615
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Saisai Shao
>Priority: Major
>  Labels: Hydrogen, SPIP
>
> In the machine learning area, accelerator card (GPU, FPGA, TPU) is 
> predominant compared to CPUs. To make the current Spark architecture to work 
> with accelerator cards, Spark itself should understand the existence of 
> accelerators and know how to schedule task onto the executors where 
> accelerators are equipped.
> Current Spark’s scheduler schedules tasks based on the locality of the data 
> plus the available of CPUs. This will introduce some problems when scheduling 
> tasks with accelerators required.
>  # CPU cores are usually more than accelerators on one node, using CPU cores 
> to schedule accelerator required tasks will introduce the mismatch.
>  # In one cluster, we always assume that CPU is equipped in each node, but 
> this is not true of accelerator cards.
>  # The existence of heterogeneous tasks (accelerator required or not) 
> requires scheduler to schedule tasks with a smart way.
> So here propose to improve the current scheduler to support heterogeneous 
> tasks (accelerator requires or not). This can be part of the work of Project 
> hydrogen.
> Details is attached in google doc. It doesn't cover all the implementation 
> details, just highlight the parts should be changed.
>  
> CC [~yanboliang] [~merlintang]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2019-01-23 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750174#comment-16750174
 ] 

Thomas Graves commented on SPARK-26413:
---

Just a note I think this overlaps with 
https://issues.apache.org/jira/browse/SPARK-24579

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
> to decide how many records are included in a single Arrow Table.  If set to 
> -1 the entire partition will be included in the table else to that number. 
> Wi

[jira] [Commented] (SPARK-26689) Bad disk causing broadcast failure

2019-01-23 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750125#comment-16750125
 ] 

Thomas Graves commented on SPARK-26689:
---

Can you add more details about your setup?  Which resource manager were you 
running?  

> Bad disk causing broadcast failure
> --
>
> Key: SPARK-26689
> URL: https://issues.apache.org/jira/browse/SPARK-26689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark on Yarn
> Mutliple Disk
>Reporter: liupengcheng
>Priority: Major
>
> We encoutered an application failure in our production cluster which caused 
> by the bad disk problems. It will incur application failure.
> {code:java}
> Job aborted due to stage failure: Task serialization failed: 
> java.io.IOException: Failed to create local dir in 
> /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
> org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
> org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> We have multiple disk on our cluster nodes, however, it still fails. I think 
> it's because spark does not handle bad disk in `DiskBlockManager` currently. 
> Actually, we can handle bad disk in multiple disk environment to avoid 
> application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24374) SPIP: Support Barrier Execution Mode in Apache Spark

2019-01-09 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738622#comment-16738622
 ] 

Thomas Graves commented on SPARK-24374:
---

[~luzengxiang] are you just saying when spark tries to kill the tasks running 
on tensorflow they don't really get killed?  this could be tensorflow 
spark.task.reaper.killTimeout.

> SPIP: Support Barrier Execution Mode in Apache Spark
> 
>
> Key: SPARK-24374
> URL: https://issues.apache.org/jira/browse/SPARK-24374
> Project: Spark
>  Issue Type: Epic
>  Components: ML, Spark Core
>Affects Versions: 2.4.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>  Labels: Hydrogen, SPIP
> Attachments: SPIP_ Support Barrier Scheduling in Apache Spark.pdf
>
>
> (See details in the linked/attached SPIP doc.)
> {quote}
> The proposal here is to add a new scheduling model to Apache Spark so users 
> can properly embed distributed DL training as a Spark stage to simplify the 
> distributed training workflow. For example, Horovod uses MPI to implement 
> all-reduce to accelerate distributed TensorFlow training. The computation 
> model is different from MapReduce used by Spark. In Spark, a task in a stage 
> doesn’t depend on any other tasks in the same stage, and hence it can be 
> scheduled independently. In MPI, all workers start at the same time and pass 
> messages around. To embed this workload in Spark, we need to introduce a new 
> scheduling model, tentatively named “barrier scheduling”, which launches 
> tasks at the same time and provides users enough information and tooling to 
> embed distributed DL training. Spark can also provide an extra layer of fault 
> tolerance in case some tasks failed in the middle, where Spark would abort 
> all tasks and restart the stage.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (YARN-9116) Capacity Scheduler: add the default maximum-allocation-mb and maximum-allocation-vcores for the queues

2019-01-08 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/YARN-9116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16737392#comment-16737392
 ] 

Thomas Graves commented on YARN-9116:
-

Yes so you want to keep the behavior that the cluster level maximum is the 
absolute maximum and no child queues can be larger then that, otherwise it 
breaks backwards compatibility.  

> Capacity Scheduler: add the default maximum-allocation-mb and 
> maximum-allocation-vcores for the queues
> --
>
> Key: YARN-9116
> URL: https://issues.apache.org/jira/browse/YARN-9116
> Project: Hadoop YARN
>  Issue Type: Sub-task
>  Components: capacity scheduler
>Affects Versions: 2.7.0
>Reporter: Aihua Xu
>Assignee: Aihua Xu
>Priority: Major
> Attachments: YARN-9116.1.patch
>
>
> YARN-1582 adds the support of maximum-allocation-mb configuration per queue 
> which is targeting to support larger container features on dedicated queues 
> (larger maximum-allocation-mb/maximum-allocation-vcores for such queue) . 
> While to achieve larger container configuration, we need to increase the 
> global maximum-allocation-mb/maximum-allocation-vcores (e.g. 120G/256) and 
> then override those configurations with desired values on the queues since 
> queue configuration can't be larger than cluster configuration. There are 
> many queues in the system and if we forget to configure such values when 
> adding a new queue, then such queue gets default 120G/256 which typically is 
> not what we want.  
> We can come up with a queue-default configuration (set to normal queue 
> configuration like 16G/8), so the leaf queues gets such values by default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org



[jira] [Updated] (SPARK-26269) YarnAllocator should have same blacklist behaviour with YARN to maxmize use of cluster resource

2019-01-07 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-26269:
--
Fix Version/s: 2.4.1

> YarnAllocator should have same blacklist behaviour with YARN to maxmize use 
> of cluster resource
> ---
>
> Key: SPARK-26269
> URL: https://issues.apache.org/jira/browse/SPARK-26269
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.3.1, 2.3.2, 2.4.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Minor
> Fix For: 2.4.1, 3.0.0
>
>
> Currently, YarnAllocator may put a node with a completed container whose exit 
> status is not one of SUCCESS, PREEMPTED, KILLED_EXCEEDED_VMEM, 
> KILLED_EXCEEDED_PMEM into blacklist. Howerver, for other exit status, e.g. 
> KILLED_BY_RESOURCEMANAGER, Yarn do not consider its related nodes shoule be 
> added into blacklist(see YARN's explaination for detail 
> https://github.com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java#L273).
>  So, relaxing the current blacklist rule and having the same blacklist 
> behaviour with YARN would maxmize use of cluster resources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26285) Add a metric source for accumulators (aka AccumulatorSource)

2018-12-22 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-26285:
-

Assignee: Alessandro Bellina

> Add a metric source for accumulators (aka AccumulatorSource)
> 
>
> Key: SPARK-26285
> URL: https://issues.apache.org/jira/browse/SPARK-26285
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Assignee: Alessandro Bellina
>Priority: Minor
> Fix For: 3.0.0
>
>
> We'd like a simple mechanism to register spark accumulators against the 
> codahale metrics registry. 
> This task proposes adding a LongAccumulatorSource and a 
> DoubleAccumulatorSource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26285) Add a metric source for accumulators (aka AccumulatorSource)

2018-12-22 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-26285.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

> Add a metric source for accumulators (aka AccumulatorSource)
> 
>
> Key: SPARK-26285
> URL: https://issues.apache.org/jira/browse/SPARK-26285
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Assignee: Alessandro Bellina
>Priority: Minor
> Fix For: 3.0.0
>
>
> We'd like a simple mechanism to register spark accumulators against the 
> codahale metrics registry. 
> This task proposes adding a LongAccumulatorSource and a 
> DoubleAccumulatorSource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26269) YarnAllocator should have same blacklist behaviour with YARN to maxmize use of cluster resource

2018-12-22 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-26269:
--
Issue Type: Bug  (was: Improvement)

> YarnAllocator should have same blacklist behaviour with YARN to maxmize use 
> of cluster resource
> ---
>
> Key: SPARK-26269
> URL: https://issues.apache.org/jira/browse/SPARK-26269
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.3.1, 2.3.2, 2.4.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Minor
> Fix For: 3.0.0
>
>
> Currently, YarnAllocator may put a node with a completed container whose exit 
> status is not one of SUCCESS, PREEMPTED, KILLED_EXCEEDED_VMEM, 
> KILLED_EXCEEDED_PMEM into blacklist. Howerver, for other exit status, e.g. 
> KILLED_BY_RESOURCEMANAGER, Yarn do not consider its related nodes shoule be 
> added into blacklist(see YARN's explaination for detail 
> https://github.com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java#L273).
>  So, relaxing the current blacklist rule and having the same blacklist 
> behaviour with YARN would maxmize use of cluster resources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26269) YarnAllocator should have same blacklist behaviour with YARN to maxmize use of cluster resource

2018-12-21 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-26269:
-

Assignee: wuyi

> YarnAllocator should have same blacklist behaviour with YARN to maxmize use 
> of cluster resource
> ---
>
> Key: SPARK-26269
> URL: https://issues.apache.org/jira/browse/SPARK-26269
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.3.1, 2.3.2, 2.4.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Minor
> Fix For: 3.0.0
>
>
> Currently, YarnAllocator may put a node with a completed container whose exit 
> status is not one of SUCCESS, PREEMPTED, KILLED_EXCEEDED_VMEM, 
> KILLED_EXCEEDED_PMEM into blacklist. Howerver, for other exit status, e.g. 
> KILLED_BY_RESOURCEMANAGER, Yarn do not consider its related nodes shoule be 
> added into blacklist(see YARN's explaination for detail 
> https://github.com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java#L273).
>  So, relaxing the current blacklist rule and having the same blacklist 
> behaviour with YARN would maxmize use of cluster resources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26269) YarnAllocator should have same blacklist behaviour with YARN to maxmize use of cluster resource

2018-12-21 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-26269.
---
   Resolution: Fixed
Fix Version/s: (was: 2.4.0)
   3.0.0

> YarnAllocator should have same blacklist behaviour with YARN to maxmize use 
> of cluster resource
> ---
>
> Key: SPARK-26269
> URL: https://issues.apache.org/jira/browse/SPARK-26269
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.3.1, 2.3.2, 2.4.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Minor
> Fix For: 3.0.0
>
>
> Currently, YarnAllocator may put a node with a completed container whose exit 
> status is not one of SUCCESS, PREEMPTED, KILLED_EXCEEDED_VMEM, 
> KILLED_EXCEEDED_PMEM into blacklist. Howerver, for other exit status, e.g. 
> KILLED_BY_RESOURCEMANAGER, Yarn do not consider its related nodes shoule be 
> added into blacklist(see YARN's explaination for detail 
> https://github.com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java#L273).
>  So, relaxing the current blacklist rule and having the same blacklist 
> behaviour with YARN would maxmize use of cluster resources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26201) python broadcast.value on driver fails with disk encryption enabled

2018-11-30 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-26201.
---
   Resolution: Fixed
 Assignee: Sanket Chintapalli
Fix Version/s: 3.0.0
   2.4.1
   2.3.3

> python broadcast.value on driver fails with disk encryption enabled
> ---
>
> Key: SPARK-26201
> URL: https://issues.apache.org/jira/browse/SPARK-26201
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Thomas Graves
>Assignee: Sanket Chintapalli
>Priority: Major
> Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> I was trying python with rpc and disk encryption enabled and when I tried a 
> python broadcast variable and just read the value back on the driver side the 
> job failed with:
>  
> Traceback (most recent call last): File "broadcast.py", line 37, in  
> words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in value 
> File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File 
> "pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of 
> input
> To reproduce use configs: --conf spark.network.crypto.enabled=true --conf 
> spark.io.encryption.enabled=true
>  
> Code:
> words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
>  words_new.value
>  print(words_new.value)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26201) python broadcast.value on driver fails with disk encryption enabled

2018-11-28 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702028#comment-16702028
 ] 

Thomas Graves commented on SPARK-26201:
---

the issue here seems to be that it isn't decrypting the file before trying to 
read it, we will have a patch up for this shortly.

> python broadcast.value on driver fails with disk encryption enabled
> ---
>
> Key: SPARK-26201
> URL: https://issues.apache.org/jira/browse/SPARK-26201
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Thomas Graves
>Priority: Major
>
> I was trying python with rpc and disk encryption enabled and when I tried a 
> python broadcast variable and just read the value back on the driver side the 
> job failed with:
>  
> Traceback (most recent call last): File "broadcast.py", line 37, in  
> words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in value 
> File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File 
> "pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of 
> input
> To reproduce use configs: --conf spark.network.crypto.enabled=true --conf 
> spark.io.encryption.enabled=true
>  
> Code:
> words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
>  words_new.value
>  print(words_new.value)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26201) python broadcast.value on driver fails with disk encryption enabled

2018-11-28 Thread Thomas Graves (JIRA)
Thomas Graves created SPARK-26201:
-

 Summary: python broadcast.value on driver fails with disk 
encryption enabled
 Key: SPARK-26201
 URL: https://issues.apache.org/jira/browse/SPARK-26201
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.3.2
Reporter: Thomas Graves


I was trying python with rpc and disk encryption enabled and when I tried a 
python broadcast variable and just read the value back on the driver side the 
job failed with:

 

Traceback (most recent call last): File "broadcast.py", line 37, in  
words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in value 
File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File 
"pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of input

To reproduce use configs: --conf spark.network.crypto.enabled=true --conf 
spark.io.encryption.enabled=true

 

Code:

words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
 words_new.value
 print(words_new.value)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (YARN-9055) Capacity Scheduler: allow larger queue level maximum-allocation-mb to override the cluster configuration

2018-11-27 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/YARN-9055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700511#comment-16700511
 ] 

Thomas Graves commented on YARN-9055:
-

It would definitely be a change in behavior which could surprise people with 
existing configurations.   I do think its easier to have this way so you don't 
have to configure all the queues.  I don't remember all the details on why I 
did it this way, I think it was mostly to not break the existing functionality 
of the cluster level max.,  

> Capacity Scheduler: allow larger queue level maximum-allocation-mb to 
> override the cluster configuration
> 
>
> Key: YARN-9055
> URL: https://issues.apache.org/jira/browse/YARN-9055
> Project: Hadoop YARN
>  Issue Type: Improvement
>  Components: capacityscheduler
>Affects Versions: 2.7.0
>Reporter: Aihua Xu
>Assignee: Aihua Xu
>Priority: Major
> Attachments: YARN-9055.1.patch
>
>
> YARN-1582 adds the support of maximum-allocation-mb configuration per queue. 
> That feature gives the flexibility to give different memory requirements for 
> different queues. Such patch adds the limitation that the queue level 
> configuration can't exceed the cluster level default configuration, but I 
> feel it may make more sense to remove such limitation to allow any overrides 
> since 
> # Such configuration is controlled by the admin so it shouldn't get abused; 
> # It's common that typical queues require standard size containers while some 
> job (queues) have requirements for larger containers. With current 
> limitation, we have to set larger configuration on the cluster setting which 
> will cause resource abuse unless we override them on all the queues.
> We can remove such limitation in CapacitySchedulerConfiguration.java so the 
> cluster setting provides the default value and queue setting can override it. 
> {noformat}
>if (maxAllocationMbPerQueue > clusterMax.getMemorySize()
> || maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
>   throw new IllegalArgumentException(
>   "Queue maximum allocation cannot be larger than the cluster setting"
>   + " for queue " + queue
>   + " max allocation per queue: " + result
>   + " cluster setting: " + clusterMax);
> }
> {noformat}
> Let me know if it makes sense.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org



[jira] [Commented] (SPARK-26089) Handle large corrupt shuffle blocks

2018-11-27 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700493#comment-16700493
 ] 

Thomas Graves commented on SPARK-26089:
---

Yeah that seems to make sense and I wouldn't think would have much overhead.  
The only thing would be if it masked any other issues with like kryo or 
encryption that you wouldn't want to be a fetch failure. We would have to look 
at those more carefully to handle.

> Handle large corrupt shuffle blocks
> ---
>
> Key: SPARK-26089
> URL: https://issues.apache.org/jira/browse/SPARK-26089
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Shuffle, Spark Core
>Affects Versions: 2.4.0
>Reporter: Imran Rashid
>Priority: Major
>
> We've seen a bad disk lead to corruption in a shuffle block, which lead to 
> tasks repeatedly failing after fetching the data with an IOException.  The 
> tasks get retried, but the same corrupt data gets fetched again, and the 
> tasks keep failing.  As there isn't a fetch-failure, the jobs eventually 
> fail, spark never tries to regenerate the shuffle data.
> This is the same as SPARK-4105, but that fix only covered small blocks.  
> There was some discussion during that change about this limitation 
> (https://github.com/apache/spark/pull/15923#discussion_r88756017) and 
> followups to cover larger blocks (which would involve spilling to disk to 
> avoid OOM), but it looks like that never happened.
> I can think of a few approaches to this:
> 1) wrap the shuffle block input stream with another input stream, that 
> converts all exceptions into FetchFailures.  This is similar to the fix of 
> SPARK-4105, but that reads the entire input stream up-front, and instead I'm 
> proposing to do it within the InputStream itself so its streaming and does 
> not have a large memory overhead.
> 2) Add checksums to shuffle blocks.  This was proposed 
> [here|https://github.com/apache/spark/pull/15894] and abandoned as being too 
> complex.
> 3) Try to tackle this with blacklisting instead: when there is any failure in 
> a task that is reading shuffle data, assign some "blame" to the source of the 
> shuffle data, and eventually blacklist the source.  It seems really tricky to 
> get sensible heuristics for this, though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26089) Handle large corrupt shuffle blocks

2018-11-26 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699639#comment-16699639
 ] 

Thomas Graves commented on SPARK-26089:
---

it would definitely be nice to improve blacklisting, internally we started 
working on adding more logic to the shuffle fetcher and communication back to 
the driver so that the driver could make decisions like this, but we just 
haven't gotten back to it.

Do you have a stacktrace from the failure or is it the same as -SPARK-4105?-

Can you elaborate on what you mean with 1?

> Handle large corrupt shuffle blocks
> ---
>
> Key: SPARK-26089
> URL: https://issues.apache.org/jira/browse/SPARK-26089
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Shuffle, Spark Core
>Affects Versions: 2.4.0
>Reporter: Imran Rashid
>Priority: Major
>
> We've seen a bad disk lead to corruption in a shuffle block, which lead to 
> tasks repeatedly failing after fetching the data with an IOException.  The 
> tasks get retried, but the same corrupt data gets fetched again, and the 
> tasks keep failing.  As there isn't a fetch-failure, the jobs eventually 
> fail, spark never tries to regenerate the shuffle data.
> This is the same as SPARK-4105, but that fix only covered small blocks.  
> There was some discussion during that change about this limitation 
> (https://github.com/apache/spark/pull/15923#discussion_r88756017) and 
> followups to cover larger blocks (which would involve spilling to disk to 
> avoid OOM), but it looks like that never happened.
> I can think of a few approaches to this:
> 1) wrap the shuffle block input stream with another input stream, that 
> converts all exceptions into FetchFailures.  This is similar to the fix of 
> SPARK-4105, but that reads the entire input stream up-front, and instead I'm 
> proposing to do it within the InputStream itself so its streaming and does 
> not have a large memory overhead.
> 2) Add checksums to shuffle blocks.  This was proposed 
> [here|https://github.com/apache/spark/pull/15894] and abandoned as being too 
> complex.
> 3) Try to tackle this with blacklisting instead: when there is any failure in 
> a task that is reading shuffle data, assign some "blame" to the source of the 
> shuffle data, and eventually blacklist the source.  It seems really tricky to 
> get sensible heuristics for this, though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21809) Change Stage Page to use datatables to support sorting columns and searching

2018-11-26 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-21809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-21809.
---
   Resolution: Fixed
 Assignee: Parth Gandhi
Fix Version/s: 3.0.0

> Change Stage Page to use datatables to support sorting columns and searching
> 
>
> Key: SPARK-21809
> URL: https://issues.apache.org/jira/browse/SPARK-21809
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.2.0
>Reporter: Nuochen Lyu
>Assignee: Parth Gandhi
>Priority: Minor
> Fix For: 3.0.0
>
>
> Support column sort and search for Stage Server using jQuery DataTable and 
> REST API. Before this commit, the Stage page was generated hard-coded HTML 
> and can not support search, also, the sorting was disabled if there is any 
> application that has more than one attempt. Supporting search and sort (over 
> all applications rather than the 20 entries in the current page) in any case 
> will greatly improve the user experience.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (YARN-8991) nodemanager not cleaning blockmgr directories inside appcache

2018-11-12 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/YARN-8991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683946#comment-16683946
 ] 

Thomas Graves commented on YARN-8991:
-

if its while its running then you should file this with Spark. Its very similar 
to https://issues.apache.org/jira/browse/SPARK-17233.

The spark external shuffle service doesn't supports that at this point.   The 
problem with that is that you may have an Spark Executor running on one host, 
generate some map output data to shuffle and then that executor exits as its 
not needed anymore.  When a reduce starts it just talked to the Yarn 
nodemanager and the external shuffle server to get the map output.   Now there 
is no executor left on the node to cleanup the shuffle output.   Support would 
have to be added for like the driver to tell the spark external shuffle service 
to cleanup.

If you don't use dynamic allocation and the external shuffle service it should 
cleanup properly.

> nodemanager not cleaning blockmgr directories inside appcache 
> --
>
> Key: YARN-8991
> URL: https://issues.apache.org/jira/browse/YARN-8991
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: nodemanager
>Affects Versions: 2.6.0
>Reporter: Hidayat Teonadi
>Priority: Major
> Attachments: yarn-nm-log.txt
>
>
> Hi, I'm running spark on yarn and have enabled the Spark Shuffle Service. I'm 
> noticing that during the lifetime of my spark streaming application, the nm 
> appcache folder is building up with blockmgr directories (filled with 
> shuffle_*.data).
> Looking into the nm logs, it seems like the blockmgr directories is not part 
> of the cleanup process of the application. Eventually disk will fill up and 
> app will crash. I have both 
> {{yarn.nodemanager.localizer.cache.cleanup.interval-ms}} and 
> {{yarn.nodemanager.localizer.cache.target-size-mb}} set, so I don't think its 
> a configuration issue.
> What is stumping me is the executor ID listed by spark during the external 
> shuffle block registration doesn't match the executor ID listed in yarn's nm 
> log. Maybe this executorID disconnect explains why the cleanup is not done ? 
> I'm assuming that blockmgr directories are supposed to be cleaned up ?
>  
> {noformat}
> 2018-11-05 15:01:21,349 INFO 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver: Registered 
> executor AppExecId{appId=application_1541045942679_0193, execId=1299} with 
> ExecutorShuffleInfo{localDirs=[/mnt1/yarn/nm/usercache/auction_importer/appcache/application_1541045942679_0193/blockmgr-b9703ae3-722c-47d1-a374-abf1cc954f42],
>  subDirsPerLocalDir=64, 
> shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
>  {noformat}
>  
> seems similar to https://issues.apache.org/jira/browse/YARN-7070, although 
> I'm not sure if the behavior I'm seeing is spark use related.
> [https://stackoverflow.com/questions/52923386/spark-streaming-job-doesnt-delete-shuffle-files]
>  has a stop gap solution of cleaning up via cron.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org



[jira] [Commented] (SPARK-25995) sparkR should ensure user args are after the argument used for the port

2018-11-12 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683876#comment-16683876
 ] 

Thomas Graves commented on SPARK-25995:
---

I haven't looked at the details but I would say whatever is easier.  I'm not 
sure what users do with the args in this case anyway.  Can they be used?

Another option would be to make the port file an actual spark internal arg or 
configuration. It looks like it has a SparkConf object at the time its read

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/RBackend.scala#L135

> sparkR should ensure user args are after the argument used for the port
> ---
>
> Key: SPARK-25995
> URL: https://issues.apache.org/jira/browse/SPARK-25995
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.3.2
>Reporter: Thomas Graves
>Priority: Minor
>
> Currently if you run sparkR and accidentally specify an argument, it fails 
> with a useless error message.  For example:
> $SPARK_HOME/bin/sparkR  --master yarn --deploy-mode client fooarg
> This gets turned into:
> Launching java with spark-submit command spark-submit   "--master" "yarn" 
> "--deploy-mode" "client" "sparkr-shell" "fooarg" 
> /tmp/Rtmp6XBGz2/backend_port162806ea36bca
> Notice that "fooarg" got put before /tmp file which is how R and jvm know 
> which port to connect to.  SparkR eventually fails with timeout exception 
> after 10 seconds.  
>  
> SparkR should either not allow args or make sure the order is correct so the 
> backend_port is always first. see 
> https://github.com/apache/spark/blob/master/R/pkg/R/sparkR.R#L129



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25995) sparkR should ensure user args are after the argument used for the port

2018-11-09 Thread Thomas Graves (JIRA)
Thomas Graves created SPARK-25995:
-

 Summary: sparkR should ensure user args are after the argument 
used for the port
 Key: SPARK-25995
 URL: https://issues.apache.org/jira/browse/SPARK-25995
 Project: Spark
  Issue Type: Bug
  Components: SparkR
Affects Versions: 2.3.2
Reporter: Thomas Graves


Currently if you run sparkR and accidentally specify an argument, it fails with 
a useless error message.  For example:

$SPARK_HOME/bin/sparkR  --master yarn --deploy-mode client fooarg

This gets turned into:

Launching java with spark-submit command spark-submit   "--master" "yarn" 
"--deploy-mode" "client" "sparkr-shell" "fooarg" 
/tmp/Rtmp6XBGz2/backend_port162806ea36bca

Notice that "fooarg" got put before /tmp file which is how R and jvm know which 
port to connect to.  SparkR eventually fails with timeout exception after 10 
seconds.  

 

SparkR should either not allow args or make sure the order is correct so the 
backend_port is always first. see 
https://github.com/apache/spark/blob/master/R/pkg/R/sparkR.R#L129



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (YARN-8991) nodemanager not cleaning blockmgr directories inside appcache

2018-11-09 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/YARN-8991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681525#comment-16681525
 ] 

Thomas Graves commented on YARN-8991:
-

[~teonadi] can you clarify here.  Are you saying its not getting cleaned up 
while the Spark application is still running or its not getting cleaned up 
after the spark application finishes?

> nodemanager not cleaning blockmgr directories inside appcache 
> --
>
> Key: YARN-8991
> URL: https://issues.apache.org/jira/browse/YARN-8991
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: nodemanager
>Affects Versions: 2.6.0
>Reporter: Hidayat Teonadi
>Priority: Major
> Attachments: yarn-nm-log.txt
>
>
> Hi, I'm running spark on yarn and have enabled the Spark Shuffle Service. I'm 
> noticing that during the lifetime of my spark streaming application, the nm 
> appcache folder is building up with blockmgr directories (filled with 
> shuffle_*.data).
> Looking into the nm logs, it seems like the blockmgr directories is not part 
> of the cleanup process of the application. Eventually disk will fill up and 
> app will crash. I have both 
> {{yarn.nodemanager.localizer.cache.cleanup.interval-ms}} and 
> {{yarn.nodemanager.localizer.cache.target-size-mb}} set, so I don't think its 
> a configuration issue.
> What is stumping me is the executor ID listed by spark during the external 
> shuffle block registration doesn't match the executor ID listed in yarn's nm 
> log. Maybe this executorID disconnect explains why the cleanup is not done ? 
> I'm assuming that blockmgr directories are supposed to be cleaned up ?
>  
> {noformat}
> 2018-11-05 15:01:21,349 INFO 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver: Registered 
> executor AppExecId{appId=application_1541045942679_0193, execId=1299} with 
> ExecutorShuffleInfo{localDirs=[/mnt1/yarn/nm/usercache/auction_importer/appcache/application_1541045942679_0193/blockmgr-b9703ae3-722c-47d1-a374-abf1cc954f42],
>  subDirsPerLocalDir=64, 
> shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
>  {noformat}
>  
> seems similar to https://issues.apache.org/jira/browse/YARN-7070, although 
> I'm not sure if the behavior I'm seeing is spark use related.
> [https://stackoverflow.com/questions/52923386/spark-streaming-job-doesnt-delete-shuffle-files]
>  has a stop gap solution of cleaning up via cron.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org



[jira] [Resolved] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled

2018-11-06 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-22148.
---
   Resolution: Fixed
Fix Version/s: 3.0.0
   2.4.1

> TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current 
> executors are blacklisted but dynamic allocation is enabled
> -
>
> Key: SPARK-22148
> URL: https://issues.apache.org/jira/browse/SPARK-22148
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
>Assignee: Dhruve Ashar
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
> Attachments: SPARK-22148_WIP.diff
>
>
> Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and 
> the whole Spark job with `task X (partition Y) cannot run anywhere due to 
> node and executor blacklist. Blacklisting behavior can be configured via 
> spark.blacklist.*.` when all the available executors are blacklisted for a 
> pending Task or TaskSet. This makes sense for static allocation, where the 
> set of executors is fixed for the duration of the application, but this might 
> lead to unnecessary job failures when dynamic allocation is enabled. For 
> example, in a Spark application with a single job at a time, when a node 
> fails at the end of a stage attempt, all other executors will complete their 
> tasks, but the tasks running in the executors of the failing node will be 
> pending. Spark will keep waiting for those tasks for 2 minutes by default 
> (spark.network.timeout) until the heartbeat timeout is triggered, and then it 
> will blacklist those executors for that stage. At that point in time, other 
> executors would had been released after being idle for 1 minute by default 
> (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't 
> started yet and so there are no more tasks available (assuming the default of 
> spark.speculation = false). So Spark will fail because the only executors 
> available are blacklisted for that stage. 
> An alternative is requesting more executors to the cluster manager in this 
> situation. This could be retried a configurable number of times after a 
> configurable wait time between request attempts, so if the cluster manager 
> fails to provide a suitable executor then the job is aborted like in the 
> previous case. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled

2018-11-06 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-22148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-22148:
-

Assignee: Dhruve Ashar

> TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current 
> executors are blacklisted but dynamic allocation is enabled
> -
>
> Key: SPARK-22148
> URL: https://issues.apache.org/jira/browse/SPARK-22148
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
>Assignee: Dhruve Ashar
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
> Attachments: SPARK-22148_WIP.diff
>
>
> Currently TaskSetManager.abortIfCompletelyBlacklisted aborts the TaskSet and 
> the whole Spark job with `task X (partition Y) cannot run anywhere due to 
> node and executor blacklist. Blacklisting behavior can be configured via 
> spark.blacklist.*.` when all the available executors are blacklisted for a 
> pending Task or TaskSet. This makes sense for static allocation, where the 
> set of executors is fixed for the duration of the application, but this might 
> lead to unnecessary job failures when dynamic allocation is enabled. For 
> example, in a Spark application with a single job at a time, when a node 
> fails at the end of a stage attempt, all other executors will complete their 
> tasks, but the tasks running in the executors of the failing node will be 
> pending. Spark will keep waiting for those tasks for 2 minutes by default 
> (spark.network.timeout) until the heartbeat timeout is triggered, and then it 
> will blacklist those executors for that stage. At that point in time, other 
> executors would had been released after being idle for 1 minute by default 
> (spark.dynamicAllocation.executorIdleTimeout), because the next stage hasn't 
> started yet and so there are no more tasks available (assuming the default of 
> spark.speculation = false). So Spark will fail because the only executors 
> available are blacklisted for that stage. 
> An alternative is requesting more executors to the cluster manager in this 
> situation. This could be retried a configurable number of times after a 
> configurable wait time between request attempts, so if the cluster manager 
> fails to provide a suitable executor then the job is aborted like in the 
> previous case. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-15815) Hang while enable blacklistExecutor and DynamicExecutorAllocator

2018-11-06 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-15815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-15815.
---
Resolution: Duplicate

> Hang while enable blacklistExecutor and DynamicExecutorAllocator 
> -
>
> Key: SPARK-15815
> URL: https://issues.apache.org/jira/browse/SPARK-15815
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 1.6.1
>Reporter: SuYan
>Priority: Minor
>
> Enable BlacklistExecutor with some time large than 120s and enabled 
> DynamicAllocate with minExecutors = 0
> 1. Assume there only left 1 task running in Executor A, and other Executor 
> are all timeout.  
> 2. the task failed, so task will not scheduled in current Executor A due to 
> enable blacklistTime.
> 3. For ExecutorAllocateManager, it always request targetNumExecutor=1 
> executors, due to we already have executor A, so the oldTargetNumExecutor  == 
> targetNumExecutor = 1, so will never add more Executors...even if Executor A 
> was timeout.  it became endless request delta=0 executors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25023) Clarify Spark security documentation

2018-11-02 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-25023:
-

Assignee: Thomas Graves

> Clarify Spark security documentation
> 
>
> Key: SPARK-25023
> URL: https://issues.apache.org/jira/browse/SPARK-25023
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.2.2
>Reporter: Thomas Graves
>Assignee: Thomas Graves
>Priority: Minor
> Fix For: 2.4.1, 3.0.0
>
>
> I was reading through our deployment docs and security docs and its not clear 
> at all what deployment modes support exactly what for security.  I think we 
> should clarify the deployments that security is off by default on all 
> deployments.  We may also want to clarify the types of communication used 
> that would need to be secured.  We may also want to clarify multi-tenant safe 
> vs other things, like standalone mode for instance in my opinion is just note 
> secure, we do talk about using spark.authenticate for a secret but all 
> applications would use the same secret.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25023) Clarify Spark security documentation

2018-11-02 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-25023.
---
   Resolution: Fixed
Fix Version/s: 3.0.0
   2.4.1

> Clarify Spark security documentation
> 
>
> Key: SPARK-25023
> URL: https://issues.apache.org/jira/browse/SPARK-25023
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.2.2
>Reporter: Thomas Graves
>Assignee: Thomas Graves
>Priority: Minor
> Fix For: 2.4.1, 3.0.0
>
>
> I was reading through our deployment docs and security docs and its not clear 
> at all what deployment modes support exactly what for security.  I think we 
> should clarify the deployments that security is off by default on all 
> deployments.  We may also want to clarify the types of communication used 
> that would need to be secured.  We may also want to clarify multi-tenant safe 
> vs other things, like standalone mode for instance in my opinion is just note 
> secure, we do talk about using spark.authenticate for a secret but all 
> applications would use the same secret.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25855) Don't use Erasure Coding for event log files

2018-10-26 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16665690#comment-16665690
 ] 

Thomas Graves commented on SPARK-25855:
---

it seems like it depends on whether you care to see the event logs before its 
finished.  If you are using the driver UI then generally people would use it 
while its running and once its finished it sounds like it would show up and you 
could see from history server.  So probably not a problem there.  But if you 
are using history server to view all UI's and expect logs to be there, it would 
be a big problem.

So it does sound like its better off by default as to not confuse users.  Were 
you going to make it configurable?

> Don't use Erasure Coding for event log files
> 
>
> Key: SPARK-25855
> URL: https://issues.apache.org/jira/browse/SPARK-25855
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Imran Rashid
>Priority: Major
>
> While testing spark with hdfs erasure coding (new in hadoop 3), we ran into a 
> bug with the event logs.  The main issue was a bug in hdfs (HDFS-14027), but 
> it did make us wonder whether Spark should be using EC for event log files in 
> general.  Its a poor choice because EC currently implements {{hflush()}} or 
> {{hsync()}} as no-ops, which mean you won't see anything in your event logs 
> until the app is complete.  That isn't necessarily a bug, but isn't really 
> great.  So I think we should ensure EC is always off for event logs.
> IIUC there is *not* a problem with applications which die without properly 
> closing the outputstream.  It'll take a while for the NN to realize the 
> client is gone and finish the block, but the data should get there eventually.
> Also related are SPARK-24787 & SPARK-19531.
> The space savings from EC would be nice as the event logs can get somewhat 
> large, but I think other factors outweigh this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25753) binaryFiles broken for small files

2018-10-22 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-25753.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

> binaryFiles broken for small files
> --
>
> Key: SPARK-25753
> URL: https://issues.apache.org/jira/browse/SPARK-25753
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 3.0.0
>Reporter: liuxian
>Assignee: liuxian
>Priority: Minor
> Fix For: 3.0.0
>
>
> _{{StreamFileInputFormat}}_ and 
> {{_WholeTextFileInputFormat_(https://issues.apache.org/jira/browse/SPARK-24610)}}
>  have the same problem: for small sized files, the computed maxSplitSize by 
> `_{{StreamFileInputFormat}}_ `  is way smaller than the default or commonly 
> used split size of 64/128M and spark throws an exception while trying to read 
> them.
> {{Exception info:}}
> _{{Minimum split size pernode 5123456 cannot be larger than maximum split 
> size 4194304 java.io.IOException: Minimum split size pernode 5123456 cannot 
> be larger than maximum split size 4194304 at 
> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:
>  201) at 
> org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at 
> scala.Option.getOrElse(Option.scala:121) at 
> org.apache.spark.rdd.RDD.partitions(RDD.scala:252) at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)}}_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25753) binaryFiles broken for small files

2018-10-22 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-25753:
-

Assignee: liuxian

> binaryFiles broken for small files
> --
>
> Key: SPARK-25753
> URL: https://issues.apache.org/jira/browse/SPARK-25753
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 3.0.0
>Reporter: liuxian
>Assignee: liuxian
>Priority: Minor
>
> _{{StreamFileInputFormat}}_ and 
> {{_WholeTextFileInputFormat_(https://issues.apache.org/jira/browse/SPARK-24610)}}
>  have the same problem: for small sized files, the computed maxSplitSize by 
> `_{{StreamFileInputFormat}}_ `  is way smaller than the default or commonly 
> used split size of 64/128M and spark throws an exception while trying to read 
> them.
> {{Exception info:}}
> _{{Minimum split size pernode 5123456 cannot be larger than maximum split 
> size 4194304 java.io.IOException: Minimum split size pernode 5123456 cannot 
> be larger than maximum split size 4194304 at 
> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:
>  201) at 
> org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at 
> scala.Option.getOrElse(Option.scala:121) at 
> org.apache.spark.rdd.RDD.partitions(RDD.scala:252) at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)}}_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25692) Flaky test: ChunkFetchIntegrationSuite

2018-10-19 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657318#comment-16657318
 ] 

Thomas Graves commented on SPARK-25692:
---

[~redsanket] can you please take a look at this

> Flaky test: ChunkFetchIntegrationSuite
> --
>
> Key: SPARK-25692
> URL: https://issues.apache.org/jira/browse/SPARK-25692
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Shixiong Zhu
>Priority: Blocker
>
> Looks like the whole test suite is pretty flaky. See: 
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.6/5490/testReport/junit/org.apache.spark.network/ChunkFetchIntegrationSuite/history/
> This may be a regression in 3.0 as this didn't happen in 2.4 branch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25732) Allow specifying a keytab/principal for proxy user for token renewal

2018-10-16 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651840#comment-16651840
 ] 

Thomas Graves edited comment on SPARK-25732 at 10/16/18 2:53 PM:
-

sorry just realized I misread the second one, thought it was kinit as user a.  
why would you run the second command?

I would actually expect that to fail or run as the super user unless it 
downloaded the keytab and kinit'd on submission before it did anything with 
hdfs, etc.

I guess that is the confusion you were referring to and can see that but it 
seems like an odd use case to me.  Is something submitting this way now?  It 
almost seems like something we should disallow.


was (Author: tgraves):
sorry just realized I misread the second one, though it was kinit as user a.  
why would you run the second command?

I would actually expect that to fail or run as the super user unless it 
downloaded the keytab and kinit'd on submission before it did anything with 
hdfs, etc.

> Allow specifying a keytab/principal for proxy user for token renewal 
> -
>
> Key: SPARK-25732
> URL: https://issues.apache.org/jira/browse/SPARK-25732
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Priority: Major
>
> As of now, application submitted with proxy-user fail after 2 week due to the 
> lack of token renewal. In order to enable it, we need the the 
> keytab/principal of the impersonated user to be specified, in order to have 
> them available for the token renewal.
> This JIRA proposes to add two parameters {{--proxy-user-principal}} and 
> {{--proxy-user-keytab}}, and the last letting a keytab being specified also 
> in a distributed FS, so that applications can be submitted by servers (eg. 
> Livy, Zeppelin) without needing all users' principals being on that machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25732) Allow specifying a keytab/principal for proxy user for token renewal

2018-10-16 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651840#comment-16651840
 ] 

Thomas Graves edited comment on SPARK-25732 at 10/16/18 2:49 PM:
-

sorry just realized I misread the second one, though it was kinit as user a.  
why would you run the second command?

I would actually expect that to fail or run as the super user unless it 
downloaded the keytab and kinit'd on submission before it did anything with 
hdfs, etc.


was (Author: tgraves):
sorry just realized I misread the second one.  why would you run the second 
command?

I would actually expect that to fail or run as the super user unless it 
downloaded the keytab and kinit'd on submission before it did anything with 
hdfs, etc.

> Allow specifying a keytab/principal for proxy user for token renewal 
> -
>
> Key: SPARK-25732
> URL: https://issues.apache.org/jira/browse/SPARK-25732
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Priority: Major
>
> As of now, application submitted with proxy-user fail after 2 week due to the 
> lack of token renewal. In order to enable it, we need the the 
> keytab/principal of the impersonated user to be specified, in order to have 
> them available for the token renewal.
> This JIRA proposes to add two parameters {{--proxy-user-principal}} and 
> {{--proxy-user-keytab}}, and the last letting a keytab being specified also 
> in a distributed FS, so that applications can be submitted by servers (eg. 
> Livy, Zeppelin) without needing all users' principals being on that machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25732) Allow specifying a keytab/principal for proxy user for token renewal

2018-10-16 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651840#comment-16651840
 ] 

Thomas Graves commented on SPARK-25732:
---

sorry just realized I misread the second one.  why would you run the second 
command?

I would actually expect that to fail or run as the super user unless it 
downloaded the keytab and kinit'd on submission before it did anything with 
hdfs, etc.

> Allow specifying a keytab/principal for proxy user for token renewal 
> -
>
> Key: SPARK-25732
> URL: https://issues.apache.org/jira/browse/SPARK-25732
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Priority: Major
>
> As of now, application submitted with proxy-user fail after 2 week due to the 
> lack of token renewal. In order to enable it, we need the the 
> keytab/principal of the impersonated user to be specified, in order to have 
> them available for the token renewal.
> This JIRA proposes to add two parameters {{--proxy-user-principal}} and 
> {{--proxy-user-keytab}}, and the last letting a keytab being specified also 
> in a distributed FS, so that applications can be submitted by servers (eg. 
> Livy, Zeppelin) without needing all users' principals being on that machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25732) Allow specifying a keytab/principal for proxy user for token renewal

2018-10-16 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651827#comment-16651827
 ] 

Thomas Graves commented on SPARK-25732:
---

yeah I understand the concern, we don't want to confuse user if we can help it. 
The 2 command above are the same except the --proxy-user and in my opinion, I 
don't think it would be confusing to the user, you pass in the keytab and 
principal for the user who's credentials you want refreshed, in this case its 
user "a" in both cases.  Seems like making sure docs are clear should make it 
clear to the users.  I assume most users submitting via livy don't realize they 
are using livy and being launched as proxy-user.  So user would just specify 
keytab/principal configs based on their own user.

 

> Allow specifying a keytab/principal for proxy user for token renewal 
> -
>
> Key: SPARK-25732
> URL: https://issues.apache.org/jira/browse/SPARK-25732
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Priority: Major
>
> As of now, application submitted with proxy-user fail after 2 week due to the 
> lack of token renewal. In order to enable it, we need the the 
> keytab/principal of the impersonated user to be specified, in order to have 
> them available for the token renewal.
> This JIRA proposes to add two parameters {{--proxy-user-principal}} and 
> {{--proxy-user-keytab}}, and the last letting a keytab being specified also 
> in a distributed FS, so that applications can be submitted by servers (eg. 
> Livy, Zeppelin) without needing all users' principals being on that machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25732) Allow specifying a keytab/principal for proxy user for token renewal

2018-10-16 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651657#comment-16651657
 ] 

Thomas Graves commented on SPARK-25732:
---

So like Marcelo mentioned can't you re-use the keytab/principal option already 
there?  It might need slightly modified to pull from HDFS but that is really 
what this is doing, its just livy is submitting the job for you.  Really the 
user could specify it when submitting the job as a conf (? I guess depends on 
who is calling livy, jupyter for instance definitely could as user can pass 
configs).  I would prefer that over adding more configs.

There are lots of cases things are in the middle of job submission, livy, 
oozie, other workflow managers.  I don't see that as a reason not to do tokens. 
 User should know they are submitting jobs (especially one that runs for 2 
weeks) and until we have a good automated solution, they would have to setup 
cron or something else to push tokens before they expire.  I know the YARN 
folks were looking at options to help with this but haven't synced with them 
lately as ideally there would be a way to push the tokens to the RM for it to 
continue to renew so you would only have to do it before max lifetime.   Its 
easy enough to write a script that runs and does a list of applications running 
for the user and push tokens to each of those, assuming we had spark-submit 
option to push tokens.

> Allow specifying a keytab/principal for proxy user for token renewal 
> -
>
> Key: SPARK-25732
> URL: https://issues.apache.org/jira/browse/SPARK-25732
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Priority: Major
>
> As of now, application submitted with proxy-user fail after 2 week due to the 
> lack of token renewal. In order to enable it, we need the the 
> keytab/principal of the impersonated user to be specified, in order to have 
> them available for the token renewal.
> This JIRA proposes to add two parameters {{--proxy-user-principal}} and 
> {{--proxy-user-keytab}}, and the last letting a keytab being specified also 
> in a distributed FS, so that applications can be submitted by servers (eg. 
> Livy, Zeppelin) without needing all users' principals being on that machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25732) Allow specifying a keytab/principal for proxy user for token renewal

2018-10-15 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16650560#comment-16650560
 ] 

Thomas Graves commented on SPARK-25732:
---

I would much rather see Spark start to push tokens and distributing them.

I'm not fond of pushing keytabs, many security folks/companies won't allow it.  
If you do this it means that all the users keytabs are in HDFS all the time, 
which in my opinion is even worse then our existing keytab/principal options 
where it can be picked up locally and its only in HDFS temporarily.   Just more 
chances permissions are messed up and people compromise keytabs which are 
indefinitely and much harder to revoke then things like tokens.

Pushing token is definitely more work but think we should go that way long 
term. Having an rpc connection between client and driver can be useful for 
other things as well. 

> Allow specifying a keytab/principal for proxy user for token renewal 
> -
>
> Key: SPARK-25732
> URL: https://issues.apache.org/jira/browse/SPARK-25732
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy
>Affects Versions: 2.4.0
>Reporter: Marco Gaido
>Priority: Major
>
> As of now, application submitted with proxy-user fail after 2 week due to the 
> lack of token renewal. In order to enable it, we need the the 
> keytab/principal of the impersonated user to be specified, in order to have 
> them available for the token renewal.
> This JIRA proposes to add two parameters {{--proxy-user-principal}} and 
> {{--proxy-user-keytab}}, and the last letting a keytab being specified also 
> in a distributed FS, so that applications can be submitted by servers (eg. 
> Livy, Zeppelin) without needing all users' principals being on that machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-21809) Change Stage Page to use datatables to support sorting columns and searching

2018-10-12 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-21809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reopened SPARK-21809:
---

> Change Stage Page to use datatables to support sorting columns and searching
> 
>
> Key: SPARK-21809
> URL: https://issues.apache.org/jira/browse/SPARK-21809
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.2.0
>Reporter: Nuochen Lyu
>Priority: Minor
>
> Support column sort and search for Stage Server using jQuery DataTable and 
> REST API. Before this commit, the Stage page was generated hard-coded HTML 
> and can not support search, also, the sorting was disabled if there is any 
> application that has more than one attempt. Supporting search and sort (over 
> all applications rather than the 20 entries in the current page) in any case 
> will greatly improve the user experience.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24851) Map a Stage ID to it's Associated Job ID in UI

2018-10-09 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-24851.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

> Map a Stage ID to it's Associated Job ID in UI
> --
>
> Key: SPARK-24851
> URL: https://issues.apache.org/jira/browse/SPARK-24851
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Parth Gandhi
>Assignee: Parth Gandhi
>Priority: Trivial
> Fix For: 3.0.0
>
>
> It would be nice to have a field in Stage Page UI which would show mapping of 
> the current stage id to the job id's to which that stage belongs to. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24851) Map a Stage ID to it's Associated Job ID in UI

2018-10-09 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-24851:
-

Assignee: Parth Gandhi

> Map a Stage ID to it's Associated Job ID in UI
> --
>
> Key: SPARK-24851
> URL: https://issues.apache.org/jira/browse/SPARK-24851
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Parth Gandhi
>Assignee: Parth Gandhi
>Priority: Trivial
>
> It would be nice to have a field in Stage Page UI which would show mapping of 
> the current stage id to the job id's to which that stage belongs to. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25641) Change the spark.shuffle.server.chunkFetchHandlerThreadsPercent default to 100

2018-10-08 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-25641.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

> Change the spark.shuffle.server.chunkFetchHandlerThreadsPercent default to 100
> --
>
> Key: SPARK-25641
> URL: https://issues.apache.org/jira/browse/SPARK-25641
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Sanket Reddy
>Assignee: Sanket Reddy
>Priority: Minor
> Fix For: 3.0.0
>
>
> We want to change the default percentage to 100 for 
> spark.shuffle.server.chunkFetchHandlerThreadsPercent. The reason being
> currently this is set to 0. Which means currently if server.ioThreads > 0, 
> the default number of threads would be 2 * #cores instead of 
> server.io.Threads. We want the default to server.io.Threads in case this is 
> not set at all. Also here a default of 0 would also mean 2 * #cores



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25641) Change the spark.shuffle.server.chunkFetchHandlerThreadsPercent default to 100

2018-10-08 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-25641:
-

Assignee: Sanket Reddy

> Change the spark.shuffle.server.chunkFetchHandlerThreadsPercent default to 100
> --
>
> Key: SPARK-25641
> URL: https://issues.apache.org/jira/browse/SPARK-25641
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Sanket Reddy
>Assignee: Sanket Reddy
>Priority: Minor
> Fix For: 3.0.0
>
>
> We want to change the default percentage to 100 for 
> spark.shuffle.server.chunkFetchHandlerThreadsPercent. The reason being
> currently this is set to 0. Which means currently if server.ioThreads > 0, 
> the default number of threads would be 2 * #cores instead of 
> server.io.Threads. We want the default to server.io.Threads in case this is 
> not set at all. Also here a default of 0 would also mean 2 * #cores



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25501) Kafka delegation token support

2018-10-03 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637248#comment-16637248
 ] 

Thomas Graves commented on SPARK-25501:
---

the spip title has "Structured Streaming", is there some reason it is limited 
to structured streaming and not just a generic get tokens from kafka if someone 
requests?  Perhaps I'm doing a batch job and want to read from kafka

> Kafka delegation token support
> --
>
> Key: SPARK-25501
> URL: https://issues.apache.org/jira/browse/SPARK-25501
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Gabor Somogyi
>Priority: Major
>  Labels: SPIP
>
> In kafka version 1.1 delegation token support is released. As spark updated 
> it's kafka client to 2.0.0 now it's possible to implement delegation token 
> support. Please see description: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25501) Kafka delegation token support

2018-10-03 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637240#comment-16637240
 ] 

Thomas Graves commented on SPARK-25501:
---

did you post SPIP to the dev list, I didn't see it go by but might have missed?

> Kafka delegation token support
> --
>
> Key: SPARK-25501
> URL: https://issues.apache.org/jira/browse/SPARK-25501
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Gabor Somogyi
>Priority: Major
>  Labels: SPIP
>
> In kafka version 1.1 delegation token support is released. As spark updated 
> it's kafka client to 2.0.0 now it's possible to implement delegation token 
> support. Please see description: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18364) Expose metrics for YarnShuffleService

2018-10-01 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-18364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-18364:
-

Assignee: Marek Simunek

> Expose metrics for YarnShuffleService
> -
>
> Key: SPARK-18364
> URL: https://issues.apache.org/jira/browse/SPARK-18364
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.0.1
>Reporter: Steven Rand
>Assignee: Marek Simunek
>Priority: Major
> Fix For: 2.5.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> ExternalShuffleService exposes metrics as of SPARK-16405. However, 
> YarnShuffleService does not.
> The work of instrumenting ExternalShuffleBlockHandler was already done in 
> SPARK-16405, so this JIRA is for creating a MetricsSystem in 
> YarnShuffleService similarly to how ExternalShuffleService already does it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18364) Expose metrics for YarnShuffleService

2018-10-01 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-18364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-18364.
---
   Resolution: Fixed
Fix Version/s: 2.5.0

> Expose metrics for YarnShuffleService
> -
>
> Key: SPARK-18364
> URL: https://issues.apache.org/jira/browse/SPARK-18364
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.0.1
>Reporter: Steven Rand
>Priority: Major
> Fix For: 2.5.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> ExternalShuffleService exposes metrics as of SPARK-16405. However, 
> YarnShuffleService does not.
> The work of instrumenting ExternalShuffleBlockHandler was already done in 
> SPARK-16405, so this JIRA is for creating a MetricsSystem in 
> YarnShuffleService similarly to how ExternalShuffleService already does it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25538) incorrect row counts after distinct()

2018-10-01 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-25538:
--
Priority: Blocker  (was: Major)

> incorrect row counts after distinct()
> -
>
> Key: SPARK-25538
> URL: https://issues.apache.org/jira/browse/SPARK-25538
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
> Environment: Reproduced on a Centos7 VM and from source in Intellij 
> on OS X.
>Reporter: Steven Rand
>Priority: Blocker
>  Labels: correctness
> Attachments: SPARK-25538-repro.tgz
>
>
> It appears that {{df.distinct.count}} can return incorrect values after 
> SPARK-23713. It's possible that other operations are affected as well; 
> {{distinct}} just happens to be the one that we noticed. I believe that this 
> issue was introduced by SPARK-23713 because I can't reproduce it until that 
> commit, and I've been able to reproduce it after that commit as well as with 
> {{tags/v2.4.0-rc1}}. 
> Below are example spark-shell sessions to illustrate the problem. 
> Unfortunately the data used in these examples can't be uploaded to this Jira 
> ticket. I'll try to create test data which also reproduces the issue, and 
> will upload that if I'm able to do so.
> Example from Spark 2.3.1, which behaves correctly:
> {code}
> scala> val df = spark.read.parquet("hdfs:///data")
> df: org.apache.spark.sql.DataFrame = []
> scala> df.count
> res0: Long = 123
> scala> df.distinct.count
> res1: Long = 115
> {code}
> Example from Spark 2.4.0-rc1, which returns different output:
> {code}
> scala> val df = spark.read.parquet("hdfs:///data")
> df: org.apache.spark.sql.DataFrame = []
> scala> df.count
> res0: Long = 123
> scala> df.distinct.count
> res1: Long = 116
> scala> df.sort("col_0").distinct.count
> res2: Long = 123
> scala> df.withColumnRenamed("col_0", "newName").distinct.count
> res3: Long = 115
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests

2018-09-21 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-24355:
-

Assignee: Sanket Chintapalli

> Improve Spark shuffle server responsiveness to non-ChunkFetch requests
> --
>
> Key: SPARK-24355
> URL: https://issues.apache.org/jira/browse/SPARK-24355
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.0
> Environment: Hadoop-2.7.4
> Spark-2.3.0
>Reporter: Min Shen
>Assignee: Sanket Chintapalli
>Priority: Major
> Fix For: 2.5.0
>
>
> We run Spark on YARN, and deploy Spark external shuffle service as part of 
> YARN NM aux service.
> One issue we saw with Spark external shuffle service is the various timeout 
> experienced by the clients on either registering executor with local shuffle 
> server or establish connection to remote shuffle server.
> Example of a timeout for establishing connection with remote shuffle server:
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
> {code}
> Example of a timeout for registering executor with local shuffle server:
> {code:java}
> ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
> {code}
> While patches such as SPARK-20640 and config parameters such as 
> spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when 
> spark.authenticate is set to true) could help to alleviate this type of 
> problems, it does not solve the fundamental issue.
> We have observed that, when the shuffle workload gets very busy in peak 
> hours, the client requests could timeout even after configuring these 
> parameters to very high values. Further investigating this issue revealed the 
> following issue:
> Right now, the default server side netty handler threads is 2 * # cores, and 
> can be further configured with parameter spark.shuffle.io.serverThreads.
> In order to process a client request, it would require one available server 
> netty handler thread.
> However, when the server netty handler threads start to process 
> ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk 
> contentions from the random read operations initiated by all the 
> ChunkFetchRequests received from clients.
> As a result, when the shuffle server is servi

[jira] [Commented] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests

2018-09-21 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623660#comment-16623660
 ] 

Thomas Graves commented on SPARK-24355:
---

pr that got merged didn't get linked properly: 
https://github.com/apache/spark/pull/22173

> Improve Spark shuffle server responsiveness to non-ChunkFetch requests
> --
>
> Key: SPARK-24355
> URL: https://issues.apache.org/jira/browse/SPARK-24355
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.0
> Environment: Hadoop-2.7.4
> Spark-2.3.0
>Reporter: Min Shen
>Priority: Major
> Fix For: 2.5.0
>
>
> We run Spark on YARN, and deploy Spark external shuffle service as part of 
> YARN NM aux service.
> One issue we saw with Spark external shuffle service is the various timeout 
> experienced by the clients on either registering executor with local shuffle 
> server or establish connection to remote shuffle server.
> Example of a timeout for establishing connection with remote shuffle server:
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
> {code}
> Example of a timeout for registering executor with local shuffle server:
> {code:java}
> ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
> {code}
> While patches such as SPARK-20640 and config parameters such as 
> spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when 
> spark.authenticate is set to true) could help to alleviate this type of 
> problems, it does not solve the fundamental issue.
> We have observed that, when the shuffle workload gets very busy in peak 
> hours, the client requests could timeout even after configuring these 
> parameters to very high values. Further investigating this issue revealed the 
> following issue:
> Right now, the default server side netty handler threads is 2 * # cores, and 
> can be further configured with parameter spark.shuffle.io.serverThreads.
> In order to process a client request, it would require one available server 
> netty handler thread.
> However, when the server netty handler threads start to process 
> ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk 
> contentions from the random read operations initiated by all the 
> ChunkFetchRequests rec

[jira] [Resolved] (SPARK-24355) Improve Spark shuffle server responsiveness to non-ChunkFetch requests

2018-09-21 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-24355.
---
   Resolution: Fixed
Fix Version/s: 2.5.0

> Improve Spark shuffle server responsiveness to non-ChunkFetch requests
> --
>
> Key: SPARK-24355
> URL: https://issues.apache.org/jira/browse/SPARK-24355
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.0
> Environment: Hadoop-2.7.4
> Spark-2.3.0
>Reporter: Min Shen
>Priority: Major
> Fix For: 2.5.0
>
>
> We run Spark on YARN, and deploy Spark external shuffle service as part of 
> YARN NM aux service.
> One issue we saw with Spark external shuffle service is the various timeout 
> experienced by the clients on either registering executor with local shuffle 
> server or establish connection to remote shuffle server.
> Example of a timeout for establishing connection with remote shuffle server:
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
> {code}
> Example of a timeout for registering executor with local shuffle server:
> {code:java}
> ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
> {code}
> While patches such as SPARK-20640 and config parameters such as 
> spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when 
> spark.authenticate is set to true) could help to alleviate this type of 
> problems, it does not solve the fundamental issue.
> We have observed that, when the shuffle workload gets very busy in peak 
> hours, the client requests could timeout even after configuring these 
> parameters to very high values. Further investigating this issue revealed the 
> following issue:
> Right now, the default server side netty handler threads is 2 * # cores, and 
> can be further configured with parameter spark.shuffle.io.serverThreads.
> In order to process a client request, it would require one available server 
> netty handler thread.
> However, when the server netty handler threads start to process 
> ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk 
> contentions from the random read operations initiated by all the 
> ChunkFetchRequests received from clients.
> As a result, when the shuffle server is serving many concurrent 
> ChunkFet

[jira] [Updated] (SPARK-24415) Stage page aggregated executor metrics wrong when failures

2018-09-07 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-24415:
--
Fix Version/s: 2.3.2

> Stage page aggregated executor metrics wrong when failures 
> ---
>
> Key: SPARK-24415
> URL: https://issues.apache.org/jira/browse/SPARK-24415
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Thomas Graves
>Assignee: Ankur Gupta
>Priority: Critical
> Fix For: 2.3.2, 2.4.0
>
> Attachments: Screen Shot 2018-05-29 at 2.15.38 PM.png
>
>
> Running with spark 2.3 on yarn and having task failures and blacklisting, the 
> aggregated metrics by executor are not correct.  In my example it should have 
> 2 failed tasks but it only shows one.    Note I tested with master branch to 
> verify its not fixed.
> I will attach screen shot.
> To reproduce:
> $SPARK_HOME/bin/spark-shell --master yarn --deploy-mode client 
> --executor-memory=2G --num-executors=1 --conf "spark.blacklist.enabled=true" 
> --conf "spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf 
> "spark.blacklist.stage.maxFailedExecutorsPerNode=1"  --conf 
> "spark.blacklist.application.maxFailedTasksPerExecutor=2" --conf 
> "spark.blacklist.killBlacklistedExecutors=true"
> import org.apache.spark.SparkEnv 
> sc.parallelize(1 to 1, 10).map \{ x => if (SparkEnv.get.executorId.toInt 
> >= 1 && SparkEnv.get.executorId.toInt <= 4) throw new RuntimeException("Bad 
> executor") else (x % 3, x) }.reduceByKey((a, b) => a + b).collect()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25231) Running a Large Job with Speculation On Causes Executor Heartbeats to Time Out on Driver

2018-09-05 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-25231.
---
   Resolution: Fixed
 Assignee: Parth Gandhi
Fix Version/s: 2.4.0
   2.3.2

> Running a Large Job with Speculation On Causes Executor Heartbeats to Time 
> Out on Driver
> 
>
> Key: SPARK-25231
> URL: https://issues.apache.org/jira/browse/SPARK-25231
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.3.1
>Reporter: Parth Gandhi
>Assignee: Parth Gandhi
>Priority: Major
> Fix For: 2.3.2, 2.4.0
>
>
> Running a large Spark job with speculation turned on was causing executor 
> heartbeats to time out on the driver end after sometime and eventually, after 
> hitting the max number of executor failures, the job would fail. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24909) Spark scheduler can hang when fetch failures, executor lost, task running on lost executor, and multiple stage attempts

2018-08-30 Thread Thomas Graves (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves updated SPARK-24909:
--
Fix Version/s: 2.3.2

> Spark scheduler can hang when fetch failures, executor lost, task running on 
> lost executor, and multiple stage attempts
> ---
>
> Key: SPARK-24909
> URL: https://issues.apache.org/jira/browse/SPARK-24909
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.3.1
>Reporter: Thomas Graves
>Assignee: Thomas Graves
>Priority: Critical
> Fix For: 2.3.2, 2.4.0
>
>
> The DAGScheduler can hang if the executor was lost (due to fetch failure) and 
> all the tasks in the tasks sets are marked as completed. 
> ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1265)]
> It never creates new task attempts in the task scheduler but the dag 
> scheduler still has pendingPartitions.
> {code:java}
> 8/07/22 08:30:00 INFO scheduler.TaskSetManager: Starting task 55769.0 in 
> stage 44.0 (TID 970752, host1.com, executor 33, partition 55769, 
> PROCESS_LOCAL, 7874 bytes)
> 18/07/22 08:30:29 INFO scheduler.DAGScheduler: Marking ShuffleMapStage 44 
> (repartition at Lift.scala:191) as failed due to a fetch failure from 
> ShuffleMapStage 42 (map at foo.scala:27)
> 18/07/22 08:30:29 INFO scheduler.DAGScheduler: Resubmitting ShuffleMapStage 
> 42 (map at foo.scala:27) and ShuffleMapStage 44 (repartition at 
> bar.scala:191) due to fetch failure
> 
> 18/07/22 08:30:56 INFO scheduler.DAGScheduler: Executor lost: 33 (epoch 18)
> 18/07/22 08:30:56 INFO schedulerDAGScheduler: Shuffle files lost for 
> executor: 33 (epoch 18)
> 18/07/22 08:31:20 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 44 
> (MapPartitionsRDD[70] at repartition at bar.scala:191), which has no missing 
> parents
> 18/07/22 08:31:21 INFO cluster.YarnClusterScheduler: Adding task set 44.1 
> with 59955 tasks
> 18/07/22 08:31:41 INFO scheduler.TaskSetManager: Finished task 55769.0 in 
> stage 44.0 (TID 970752) in 101505 ms on host1.com (executor 33) (15081/73320)
> 8/07/22 08:31:41 INFO scheduler.DAGScheduler: Ignoring possibly bogus 
> ShuffleMapTask(44, 55769) completion from executor 33{code}
>  
> In the logs above you will see that task 55769.0 finished after the executor 
> was lost and a new task set was started.  The DAG scheduler says "Ignoring 
> possibly bogus".. but in the TaskSetManager side it has marked those tasks as 
> completed for all stage attempts. The DAGScheduler gets hung here.  I did a 
> heap dump on the process and can see that 55769 is still in the DAGScheduler 
> pendingPartitions list but the tasksetmanagers are all complete
> Note to reproduce this, you need a situation where  you have a shufflemaptask 
> (call it task1) fetching data from an executor where it also has other 
> shufflemaptasks (call it task2) running (fetch from other hosts). the task1 
> fetching the data has to FetchFail which would cause the stage to fail and 
> the executor to be marked as lost due to the fetch failure.  It restarts a 
> new task set for the new stage attempt, then the shufflemaptask task2 that 
> was running on the executor that was marked Lost finished.  The scheduler 
> ignore that complete event  "Ignoring possible bogus ...". This results in a 
> hang because at this point the TaskSetManager has already marked all tasks 
> for all attempts of that stage as completed.
>  
> Configs needed to be on:
> |{{spark.blacklist.application.fetchFailure.enabled=true}}| |
> |{{spark.blacklist.application.fetchFailure.enabled=true}}|
> spark.files.fetchFailure.unRegisterOutputOnHost=true
> spark.shuffle.service.enabled=true



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >