[jira] [Comment Edited] (SPARK-6050) Spark on YARN does not work --executor-cores is specified

2015-02-26 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14339686#comment-14339686
 ] 

Mridul Muralidharan edited comment on SPARK-6050 at 2/27/15 3:50 AM:
-

Thanks to [~tgraves] for helping investigate this.

There are multiple issues in the codebase - and not all of them have been fully 
understood.

a) For some reason, either YARN returns incorrect response to an allocate 
request or we are not setting the right param.
Note the snippet [1] to detail this.
(I cant share the logs unfortunately - but Tom has access to it and should be 
trivial for others to reproduce the issue).

b) For whatever reason (a) happens, we do not recover from it.
All subsequent requests heartbeat requests DO NOT contain pending allocation 
requests (and we have rejected/de-allocated whatever yarn just sent us due to 
(a)).

To elaborate; updateResourceRequests has missing == 0 since it is relying on 
getNumPendingAllocate() - which DOES NOT do the right thing in our context. 
Note: the 'ask' list in the super class was cleared as part of the previous 
allocate() call.


Essentially we were defending against these sort of corner cases in our code 
earlier - but the move to depend on AMRMClientImpl and the subsequent changes 
to it from under us has caused these problems for spark IMO. We should be more 
careful in future and only depend on interfaces and not implementation when it 
is relatively straight forward for us to own that aspect.


Fixing (a) will mask (b) - but IMO we should address it at the earliest too.




[1] Note the vCore in the response, and the subsequent ignoring of all 
containers.
15/02/27 01:40:30 INFO YarnAllocator: Will request 1000 executor containers, 
each with 8 cores and 38912 MB memory including 10240 MB overhead
15/02/27 01:40:30 INFO YarnAllocator: Container request (host: Any, capability: 
)
15/02/27 01:40:30 INFO ApplicationMaster: Started progress reporter thread - 
sleep time : 5000
15/02/27 01:40:30 DEBUG ApplicationMaster: Sending progress
15/02/27 01:40:30 INFO YarnAllocator: missing = 0, targetNumExecutors = 1000, 
numPendingAllocate = 1000, numExecutorsRunning = 0
15/02/27 01:40:35 DEBUG ApplicationMaster: Sending progress
15/02/27 01:40:35 INFO YarnAllocator: missing = 0, targetNumExecutors = 1000, 
numPendingAllocate = 1000, numExecutorsRunning = 0
15/02/27 01:40:36 DEBUG YarnAllocator: Allocated containers: 1000. Current 
executor count: 0. Cluster resources: .
15/02/27 01:40:36 DEBUG YarnAllocator: Releasing 1000 unneeded containers that 
were allocated to us
15/02/27 01:40:36 INFO YarnAllocator: Received 1000 containers from YARN, 
launching executors on 0 of them.


was (Author: mridulm80):

Thanks to [~tgraves] for helping investigate this.

There are multiple issues in the codebase - and not all of them have been fully 
understood.

a) For some reason, either YARN returns incorrect response to an allocate 
request or we are not setting the right param.
Note the snippet [1] to detail this.
(I cant share the logs unfortunately - but Tom has access to it and should be 
trivial for others to reproduce the issue).

b) For whatever reason (a) happens, we do not recover from it.
All subsequent requests heartbeat requests DO NOT contain pending allocation 
requests (and we have rejected/de-allocated whatever yarn just sent us due to 
(a)).

To elaborate; updateResourceRequests has missing == 0 since it is relying on 
getNumPendingAllocate() - which DOES NOT do the right thing in our context. 
Note: the 'ask' list in the super class was cleared as part of the previous 
allocate() call.


Essentially we were defending against these sort of corner cases in our code 
earlier - but the move to depend on AMRMClientImpl and the subsequent changes 
to it from under us has caused these problems for spark.


Fixing (a) will mask (b) - but IMO we should address it at the earliest too.




[1] Not the vCore in the response, and the subsequent ignoring of all 
containers.
15/02/27 01:40:30 INFO YarnAllocator: Will request 1000 executor containers, 
each with 8 cores and 38912 MB memory including 10240 MB overhead
15/02/27 01:40:30 INFO YarnAllocator: Container request (host: Any, capability: 
)
15/02/27 01:40:30 INFO ApplicationMaster: Started progress reporter thread - 
sleep time : 5000
15/02/27 01:40:30 DEBUG ApplicationMaster: Sending progress
15/02/27 01:40:30 INFO YarnAllocator: missing = 0, targetNumExecutors = 1000, 
numPendingAllocate = 1000, numExecutorsRunning = 0
15/02/27 01:40:35 DEBUG ApplicationMaster: Sending progress
15/02/27 01:40:35 INFO YarnAllocator: missing = 0, targetNumExecutors = 1000, 
numPendingAllocate = 1000, numExecutorsRunning = 0
15/02/27 01:40:36 DEBUG YarnAllocator: Allocated containers: 1000. Current 
executor count: 0. Cluster resources: .
15/02/27 01:40:36 DEBUG YarnAllocator:

[jira] [Commented] (SPARK-6050) Spark on YARN does not work --executor-cores is specified

2015-02-26 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14339686#comment-14339686
 ] 

Mridul Muralidharan commented on SPARK-6050:



Thanks to [~tgraves] for helping investigate this.

There are multiple issues in the codebase - and not all of them have been fully 
understood.

a) For some reason, either YARN returns incorrect response to an allocate 
request or we are not setting the right param.
Note the snippet [1] to detail this.
(I cant share the logs unfortunately - but Tom has access to it and should be 
trivial for others to reproduce the issue).

b) For whatever reason (a) happens, we do not recover from it.
All subsequent requests heartbeat requests DO NOT contain pending allocation 
requests (and we have rejected/de-allocated whatever yarn just sent us due to 
(a)).

To elaborate; updateResourceRequests has missing == 0 since it is relying on 
getNumPendingAllocate() - which DOES NOT do the right thing in our context. 
Note: the 'ask' list in the super class was cleared as part of the previous 
allocate() call.


Essentially we were defending against these sort of corner cases in our code 
earlier - but the move to depend on AMRMClientImpl and the subsequent changes 
to it from under us has caused these problems for spark.


Fixing (a) will mask (b) - but IMO we should address it at the earliest too.




[1] Not the vCore in the response, and the subsequent ignoring of all 
containers.
15/02/27 01:40:30 INFO YarnAllocator: Will request 1000 executor containers, 
each with 8 cores and 38912 MB memory including 10240 MB overhead
15/02/27 01:40:30 INFO YarnAllocator: Container request (host: Any, capability: 
)
15/02/27 01:40:30 INFO ApplicationMaster: Started progress reporter thread - 
sleep time : 5000
15/02/27 01:40:30 DEBUG ApplicationMaster: Sending progress
15/02/27 01:40:30 INFO YarnAllocator: missing = 0, targetNumExecutors = 1000, 
numPendingAllocate = 1000, numExecutorsRunning = 0
15/02/27 01:40:35 DEBUG ApplicationMaster: Sending progress
15/02/27 01:40:35 INFO YarnAllocator: missing = 0, targetNumExecutors = 1000, 
numPendingAllocate = 1000, numExecutorsRunning = 0
15/02/27 01:40:36 DEBUG YarnAllocator: Allocated containers: 1000. Current 
executor count: 0. Cluster resources: .
15/02/27 01:40:36 DEBUG YarnAllocator: Releasing 1000 unneeded containers that 
were allocated to us
15/02/27 01:40:36 INFO YarnAllocator: Received 1000 containers from YARN, 
launching executors on 0 of them.

> Spark on YARN does not work --executor-cores is specified
> -
>
> Key: SPARK-6050
> URL: https://issues.apache.org/jira/browse/SPARK-6050
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.0
> Environment: 2.5 based YARN cluster.
>Reporter: Mridul Muralidharan
>Priority: Blocker
>
> There are multiple issues here (which I will detail as comments), but to 
> reproduce running the following ALWAYS hangs in our cluster with the 1.3 RC
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
> yarn-cluster --executor-cores 8--num-executors 15 --driver-memory 4g  
>--executor-memory 2g  --queue webmap lib/spark-examples*.jar   
>   10



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6050) Spark on YARN does not work --executor-cores is specified

2015-02-26 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-6050:
--

 Summary: Spark on YARN does not work --executor-cores is specified
 Key: SPARK-6050
 URL: https://issues.apache.org/jira/browse/SPARK-6050
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.3.0
 Environment: 
2.5 based YARN cluster.
Reporter: Mridul Muralidharan
Priority: Blocker



There are multiple issues here (which I will detail as comments), but to 
reproduce running the following ALWAYS hangs in our cluster with the 1.3 RC

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-cluster --executor-cores 8--num-executors 15 --driver-memory 4g
 --executor-memory 2g  --queue webmap lib/spark-examples*.jar 10



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: 2GB limit for partitions?

2015-02-04 Thread Mridul Muralidharan
That work is from more than an year back and is not maintained anymore
since we do not use it inhouse now.
Also note that there have been quite a lot of changes in spark ...
Including some which break assumptions made in the patch, so it's value is
very low - having said that, do feel free to work on the jira and/or use
the patch if it helps !

Regards
Mridul

On Wednesday, February 4, 2015, Imran Rashid  wrote:

> Hi Mridul,
>
>
> do you think you'll keep working on this, or should this get picked up by
> others?  Looks like there was a lot of work put into LargeByteBuffer, seems
> promising.
>
> thanks,
> Imran
>
> On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan  > wrote:
>
>> That is fairly out of date (we used to run some of our jobs on it ... But
>> that is forked off 1.1 actually).
>>
>> Regards
>> Mridul
>>
>>
>> On Tuesday, February 3, 2015, Imran Rashid > > wrote:
>>
>>> Thanks for the explanations, makes sense.  For the record looks like this
>>> was worked on a while back (and maybe the work is even close to a
>>> solution?)
>>>
>>> https://issues.apache.org/jira/browse/SPARK-1476
>>>
>>> and perhaps an independent solution was worked on here?
>>>
>>> https://issues.apache.org/jira/browse/SPARK-1391
>>>
>>>
>>> On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin  wrote:
>>>
>>> > cc dev list
>>> >
>>> >
>>> > How are you saving the data? There are two relevant 2GB limits:
>>> >
>>> > 1. Caching
>>> >
>>> > 2. Shuffle
>>> >
>>> >
>>> > For caching, a partition is turned into a single block.
>>> >
>>> > For shuffle, each map partition is partitioned into R blocks, where R =
>>> > number of reduce tasks. It is unlikely a shuffle block > 2G, although
>>> it
>>> > can still happen.
>>> >
>>> > I think the 2nd problem is easier to fix than the 1st, because we can
>>> > handle that in the network transport layer. It'd require us to divide
>>> the
>>> > transfer of a very large block into multiple smaller blocks.
>>> >
>>> >
>>> >
>>> > On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid 
>>> wrote:
>>> >
>>> >> Michael,
>>> >>
>>> >> you are right, there is definitely some limit at 2GB.  Here is a
>>> trivial
>>> >> example to demonstrate it:
>>> >>
>>> >> import org.apache.spark.storage.StorageLevel
>>> >> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new
>>> >> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
>>> >> d.count()
>>> >>
>>> >> It gives the same error you are observing.  I was under the same
>>> >> impression as Sean about the limits only being on blocks, not
>>> partitions --
>>> >> but clearly that isn't the case here.
>>> >>
>>> >> I don't know the whole story yet, but I just wanted to at least let
>>> you
>>> >> know you aren't crazy :)
>>> >> At the very least this suggests that you might need to make smaller
>>> >> partitions for now.
>>> >>
>>> >> Imran
>>> >>
>>> >>
>>> >> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <
>>> >> m_albert...@yahoo.com.invalid> wrote:
>>> >>
>>> >>> Greetings!
>>> >>>
>>> >>> Thanks for the response.
>>> >>>
>>> >>> Below is an example of the exception I saw.
>>> >>> I'd rather not post code at the moment, so I realize it is completely
>>> >>> unreasonable to ask for a diagnosis.
>>> >>> However, I will say that adding a "partitionBy()" was the last change
>>> >>> before this error was created.
>>> >>>
>>> >>>
>>> >>> Thanks for your time and any thoughts you might have.
>>> >>>
>>> >>> Sincerely,
>>> >>>  Mike
>>> >>>
>>> >>>
>>> >>>
>>> >>> Exception in thread "main" org.apache.spark.SparkException: Job
>>> aborted
>>> >>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most 

Re: 2GB limit for partitions?

2015-02-04 Thread Mridul Muralidharan
That work is from more than an year back and is not maintained anymore
since we do not use it inhouse now.
Also note that there have been quite a lot of changes in spark ...
Including some which break assumptions made in the patch, so it's value is
very low - having said that, do feel free to work on the jira and/or use
the patch if it helps !

Regards
Mridul

On Wednesday, February 4, 2015, Imran Rashid  wrote:

> Hi Mridul,
>
>
> do you think you'll keep working on this, or should this get picked up by
> others?  Looks like there was a lot of work put into LargeByteBuffer, seems
> promising.
>
> thanks,
> Imran
>
> On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan  > wrote:
>
>> That is fairly out of date (we used to run some of our jobs on it ... But
>> that is forked off 1.1 actually).
>>
>> Regards
>> Mridul
>>
>>
>> On Tuesday, February 3, 2015, Imran Rashid > > wrote:
>>
>>> Thanks for the explanations, makes sense.  For the record looks like this
>>> was worked on a while back (and maybe the work is even close to a
>>> solution?)
>>>
>>> https://issues.apache.org/jira/browse/SPARK-1476
>>>
>>> and perhaps an independent solution was worked on here?
>>>
>>> https://issues.apache.org/jira/browse/SPARK-1391
>>>
>>>
>>> On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin  wrote:
>>>
>>> > cc dev list
>>> >
>>> >
>>> > How are you saving the data? There are two relevant 2GB limits:
>>> >
>>> > 1. Caching
>>> >
>>> > 2. Shuffle
>>> >
>>> >
>>> > For caching, a partition is turned into a single block.
>>> >
>>> > For shuffle, each map partition is partitioned into R blocks, where R =
>>> > number of reduce tasks. It is unlikely a shuffle block > 2G, although
>>> it
>>> > can still happen.
>>> >
>>> > I think the 2nd problem is easier to fix than the 1st, because we can
>>> > handle that in the network transport layer. It'd require us to divide
>>> the
>>> > transfer of a very large block into multiple smaller blocks.
>>> >
>>> >
>>> >
>>> > On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid 
>>> wrote:
>>> >
>>> >> Michael,
>>> >>
>>> >> you are right, there is definitely some limit at 2GB.  Here is a
>>> trivial
>>> >> example to demonstrate it:
>>> >>
>>> >> import org.apache.spark.storage.StorageLevel
>>> >> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new
>>> >> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
>>> >> d.count()
>>> >>
>>> >> It gives the same error you are observing.  I was under the same
>>> >> impression as Sean about the limits only being on blocks, not
>>> partitions --
>>> >> but clearly that isn't the case here.
>>> >>
>>> >> I don't know the whole story yet, but I just wanted to at least let
>>> you
>>> >> know you aren't crazy :)
>>> >> At the very least this suggests that you might need to make smaller
>>> >> partitions for now.
>>> >>
>>> >> Imran
>>> >>
>>> >>
>>> >> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <
>>> >> m_albert...@yahoo.com.invalid> wrote:
>>> >>
>>> >>> Greetings!
>>> >>>
>>> >>> Thanks for the response.
>>> >>>
>>> >>> Below is an example of the exception I saw.
>>> >>> I'd rather not post code at the moment, so I realize it is completely
>>> >>> unreasonable to ask for a diagnosis.
>>> >>> However, I will say that adding a "partitionBy()" was the last change
>>> >>> before this error was created.
>>> >>>
>>> >>>
>>> >>> Thanks for your time and any thoughts you might have.
>>> >>>
>>> >>> Sincerely,
>>> >>>  Mike
>>> >>>
>>> >>>
>>> >>>
>>> >>> Exception in thread "main" org.apache.spark.SparkException: Job
>>> aborted
>>> >>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most 

Re: 2GB limit for partitions?

2015-02-03 Thread Mridul Muralidharan
That is fairly out of date (we used to run some of our jobs on it ... But
that is forked off 1.1 actually).

Regards
Mridul

On Tuesday, February 3, 2015, Imran Rashid  wrote:

> Thanks for the explanations, makes sense.  For the record looks like this
> was worked on a while back (and maybe the work is even close to a
> solution?)
>
> https://issues.apache.org/jira/browse/SPARK-1476
>
> and perhaps an independent solution was worked on here?
>
> https://issues.apache.org/jira/browse/SPARK-1391
>
>
> On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin  > wrote:
>
> > cc dev list
> >
> >
> > How are you saving the data? There are two relevant 2GB limits:
> >
> > 1. Caching
> >
> > 2. Shuffle
> >
> >
> > For caching, a partition is turned into a single block.
> >
> > For shuffle, each map partition is partitioned into R blocks, where R =
> > number of reduce tasks. It is unlikely a shuffle block > 2G, although it
> > can still happen.
> >
> > I think the 2nd problem is easier to fix than the 1st, because we can
> > handle that in the network transport layer. It'd require us to divide the
> > transfer of a very large block into multiple smaller blocks.
> >
> >
> >
> > On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid  > wrote:
> >
> >> Michael,
> >>
> >> you are right, there is definitely some limit at 2GB.  Here is a trivial
> >> example to demonstrate it:
> >>
> >> import org.apache.spark.storage.StorageLevel
> >> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new
> >> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
> >> d.count()
> >>
> >> It gives the same error you are observing.  I was under the same
> >> impression as Sean about the limits only being on blocks, not
> partitions --
> >> but clearly that isn't the case here.
> >>
> >> I don't know the whole story yet, but I just wanted to at least let you
> >> know you aren't crazy :)
> >> At the very least this suggests that you might need to make smaller
> >> partitions for now.
> >>
> >> Imran
> >>
> >>
> >> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <
> >> m_albert...@yahoo.com.invalid> wrote:
> >>
> >>> Greetings!
> >>>
> >>> Thanks for the response.
> >>>
> >>> Below is an example of the exception I saw.
> >>> I'd rather not post code at the moment, so I realize it is completely
> >>> unreasonable to ask for a diagnosis.
> >>> However, I will say that adding a "partitionBy()" was the last change
> >>> before this error was created.
> >>>
> >>>
> >>> Thanks for your time and any thoughts you might have.
> >>>
> >>> Sincerely,
> >>>  Mike
> >>>
> >>>
> >>>
> >>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> >>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
> >>> failure: Lost task 4.3 in stage 5.0 (TID 6012,
> >>> ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
> >>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> >>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> >>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
> >>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
> >>> at
> >>>
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
> >>> at
> >>>
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
> >>> at
> >>>
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> >>> at
> >>>
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> >>> at
> >>>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >>> at
> >>>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >>> at
> >>>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >>>   at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >>> at
> >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> >>> at
> >>>
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> >>>
> >>>
> >>>   --
> >>>  *From:* Sean Owen >
> >>> *To:* Michael Albert >
> >>> *Cc:* "u...@spark.apache.org "  >
> >>> *Sent:* Monday, February 2, 2015 10:13 PM
> >>> *Subject:* Re: 2GB limit for partitions?
> >>>
> >>> The limit is on blocks, not partitions. Partitions have many blocks.
> >>>
> >>> It sounds like you are creating very large values in memory, but I'm
> >>> not sure given your description. You will run into problems if a
> >>> single object is more than 2GB, of course. More of the stack trace
> >>> might show what is mapping that much memory.
> >>>
> >>> If you simply want data into 1000 files it's a lot simpler. Just
> >>> repartition into 1000 partitions and save the data. If you need more
> >>> control over what go

Re: 2GB limit for partitions?

2015-02-03 Thread Mridul Muralidharan
That is fairly out of date (we used to run some of our jobs on it ... But
that is forked off 1.1 actually).

Regards
Mridul

On Tuesday, February 3, 2015, Imran Rashid  wrote:

> Thanks for the explanations, makes sense.  For the record looks like this
> was worked on a while back (and maybe the work is even close to a
> solution?)
>
> https://issues.apache.org/jira/browse/SPARK-1476
>
> and perhaps an independent solution was worked on here?
>
> https://issues.apache.org/jira/browse/SPARK-1391
>
>
> On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin  > wrote:
>
> > cc dev list
> >
> >
> > How are you saving the data? There are two relevant 2GB limits:
> >
> > 1. Caching
> >
> > 2. Shuffle
> >
> >
> > For caching, a partition is turned into a single block.
> >
> > For shuffle, each map partition is partitioned into R blocks, where R =
> > number of reduce tasks. It is unlikely a shuffle block > 2G, although it
> > can still happen.
> >
> > I think the 2nd problem is easier to fix than the 1st, because we can
> > handle that in the network transport layer. It'd require us to divide the
> > transfer of a very large block into multiple smaller blocks.
> >
> >
> >
> > On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid  > wrote:
> >
> >> Michael,
> >>
> >> you are right, there is definitely some limit at 2GB.  Here is a trivial
> >> example to demonstrate it:
> >>
> >> import org.apache.spark.storage.StorageLevel
> >> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new
> >> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
> >> d.count()
> >>
> >> It gives the same error you are observing.  I was under the same
> >> impression as Sean about the limits only being on blocks, not
> partitions --
> >> but clearly that isn't the case here.
> >>
> >> I don't know the whole story yet, but I just wanted to at least let you
> >> know you aren't crazy :)
> >> At the very least this suggests that you might need to make smaller
> >> partitions for now.
> >>
> >> Imran
> >>
> >>
> >> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <
> >> m_albert...@yahoo.com.invalid> wrote:
> >>
> >>> Greetings!
> >>>
> >>> Thanks for the response.
> >>>
> >>> Below is an example of the exception I saw.
> >>> I'd rather not post code at the moment, so I realize it is completely
> >>> unreasonable to ask for a diagnosis.
> >>> However, I will say that adding a "partitionBy()" was the last change
> >>> before this error was created.
> >>>
> >>>
> >>> Thanks for your time and any thoughts you might have.
> >>>
> >>> Sincerely,
> >>>  Mike
> >>>
> >>>
> >>>
> >>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> >>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
> >>> failure: Lost task 4.3 in stage 5.0 (TID 6012,
> >>> ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
> >>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> >>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> >>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
> >>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
> >>> at
> >>>
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
> >>> at
> >>>
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
> >>> at
> >>>
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> >>> at
> >>>
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> >>> at
> >>>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >>> at
> >>>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >>> at
> >>>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >>>   at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >>> at
> >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> >>> at
> >>>
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> >>>
> >>>
> >>>   --
> >>>  *From:* Sean Owen >
> >>> *To:* Michael Albert >
> >>> *Cc:* "user@spark.apache.org "  >
> >>> *Sent:* Monday, February 2, 2015 10:13 PM
> >>> *Subject:* Re: 2GB limit for partitions?
> >>>
> >>> The limit is on blocks, not partitions. Partitions have many blocks.
> >>>
> >>> It sounds like you are creating very large values in memory, but I'm
> >>> not sure given your description. You will run into problems if a
> >>> single object is more than 2GB, of course. More of the stack trace
> >>> might show what is mapping that much memory.
> >>>
> >>> If you simply want data into 1000 files it's a lot simpler. Just
> >>> repartition into 1000 partitions and save the data. If you need more
> >>> control over what go

Re: Welcoming three new committers

2015-02-03 Thread Mridul Muralidharan
Congratulations !
Keep up the good work :-)

Regards
Mridul


On Tuesday, February 3, 2015, Matei Zaharia  wrote:

> Hi all,
>
> The PMC recently voted to add three new committers: Cheng Lian, Joseph
> Bradley and Sean Owen. All three have been major contributors to Spark in
> the past year: Cheng on Spark SQL, Joseph on MLlib, and Sean on ML and many
> pieces throughout Spark Core. Join me in welcoming them as committers!
>
> Matei
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
> For additional commands, e-mail: dev-h...@spark.apache.org 
>
>


Re: keeping PR titles / descriptions up to date

2014-12-02 Thread Mridul Muralidharan
I second that !
Would also be great if the JIRA was updated accordingly too.

Regards,
Mridul


On Wed, Dec 3, 2014 at 1:53 AM, Kay Ousterhout  wrote:
> Hi all,
>
> I've noticed a bunch of times lately where a pull request changes to be
> pretty different from the original pull request, and the title /
> description never get updated.  Because the pull request title and
> description are used as the commit message, the incorrect description lives
> on forever, making it harder to understand the reason behind a particular
> commit without going back and reading the entire conversation on the pull
> request.  If folks could try to keep these up to date (and committers, try
> to remember to verify that the title and description are correct before
> making merging pull requests), that would be awesome.
>
> -Kay

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Problems with spark.locality.wait

2014-11-13 Thread Mridul Muralidharan
In the specific example stated, the user had two taskset if I
understood right ... the first taskset reads off db (dfs in your
example), and does some filter, etc and caches it.
Second which works off the cached data (which is, now, process local
locality level aware) to do map, group, etc.

The taskset(s) which work off the cached data would be sensitive to
PROCESS_LOCAL locality level.
But for the initial taskset (which loaded off hdfs/database, etc) no
tasks can be process local - since we do not have a way to specify
that in spark (which, imo, is a limitation).

Given this, the requirement seemed to be to relax locality level for
initial load taskset - since not scheduling on rack local or other
nodes seems to be hurting utilization and latency when no node local
executors are available.
But for tasksets which have process local tasks, user wants to ensure
that node/rack local schedule does not happen (based on the timeouts
and perf numbers).

Hence my suggestion on setting the individual locality level timeouts
- ofcourse, my suggestion was highly specific to the problem as stated
:-)
It is, by no means, a generalization - and I do agree we definitely do
need to address the larger scheduling issue.

Regards,
Mridul



On Fri, Nov 14, 2014 at 2:05 AM, Kay Ousterhout  wrote:
> Hi Mridul,
>
> In the case Shivaram and I saw, and based on my understanding of Ma chong's
> description, I don't think that completely fixes the problem.
>
> To be very concrete, suppose your job has two tasks, t1 and t2, and they
> each have input data (in HDFS) on h1 and h2, respectively, and that h1 and
> h2 are on the same rack. Suppose your Spark job gets allocated two
> executors, one on h1 and another on h3 (a different host with no input
> data).  When the job gets submitted to the task set manager (TSM),
> TSM.computeValidLocalityLevels will determine that the valid levels are
> NODE_LOCAL (because t1 could be run on the NODE_LOCAL executor on h1),
> RACK_LOCAL, ANY.  As a result, the TSM will not schedule t2 until
> spark.locality.wait.NODE_LOCAL expires, even though t2 has no hope of being
> scheduled on a NODE_LOCAL machine (because the job wasn't given any
> executors on h2).  You could set spark.locality.wait.NODE_LOCAL to be low,
> but then it might cause t1 (or more generally, in a larger job, other tasks
> that have NODE_LOCAL executors where they can be scheduled) to get scheduled
> on h3 (and not on h1).
>
> Is there a way you were thinking of configuring things that avoids this
> problem?
>
> I'm pretty sure we could fix this problem by tracking more information about
> each task in the TSM -- for example, the TSM has enough information to know
> that there are no NODE_LOCAL executors where t2 could be scheduled in the
> above example (and that the best possible locality level for t2 is
> RACK_LOCAL), and could schedule t2 right away on a RACK_LOCAL machine.  Of
> course, this would add a bunch of complexity to the TSM, hence the earlier
> decision that the added complexity may not be worth it.
>
> -Kay
>
> On Thu, Nov 13, 2014 at 12:11 PM, Mridul Muralidharan 
> wrote:
>>
>> Instead of setting spark.locality.wait, try setting individual
>> locality waits specifically.
>>
>> Namely, spark.locality.wait.PROCESS_LOCAL to high value (so that
>> process local tasks are always scheduled in case the task set has
>> process local tasks).
>> Set spark.locality.wait.NODE_LOCAL and spark.locality.wait.RACK_LOCAL
>> to low value - so that in case task set has no process local tasks,
>> both node local and rack local tasks are scheduled asap.
>>
>> From your description, this will alleviate the problem you mentioned.
>>
>>
>> Kay's comment, IMO, is slightly general in nature - and I suspect
>> unless we overhaul how preferred locality is specified, and allow for
>> taskset specific hints for schedule, we cant resolve that IMO.
>>
>>
>> Regards,
>> Mridul
>>
>>
>>
>> On Thu, Nov 13, 2014 at 1:25 PM, MaChong  wrote:
>> > Hi,
>> >
>> > We are running a time sensitive application with 70 partition and 800MB
>> > each parition size. The application first load data from database in
>> > different cluster, then apply a filter, cache the filted data, then apply a
>> > map and a reduce, finally collect results.
>> > The application will be finished in 20 seconds if we set
>> > spark.locality.wait to a large value (30 minutes). And it will use 100
>> > seconds, if we set spark.locality.wait a small value(less than 10 seconds)
>> > We have analysed the driver log and found lot of NODE_LOCAL and
>> > RACK_LOCAL level tasks, normally a PROCESS_LOCAL task on

Re: Problems with spark.locality.wait

2014-11-13 Thread Mridul Muralidharan
Instead of setting spark.locality.wait, try setting individual
locality waits specifically.

Namely, spark.locality.wait.PROCESS_LOCAL to high value (so that
process local tasks are always scheduled in case the task set has
process local tasks).
Set spark.locality.wait.NODE_LOCAL and spark.locality.wait.RACK_LOCAL
to low value - so that in case task set has no process local tasks,
both node local and rack local tasks are scheduled asap.

>From your description, this will alleviate the problem you mentioned.


Kay's comment, IMO, is slightly general in nature - and I suspect
unless we overhaul how preferred locality is specified, and allow for
taskset specific hints for schedule, we cant resolve that IMO.


Regards,
Mridul



On Thu, Nov 13, 2014 at 1:25 PM, MaChong  wrote:
> Hi,
>
> We are running a time sensitive application with 70 partition and 800MB each 
> parition size. The application first load data from database in different 
> cluster, then apply a filter, cache the filted data, then apply a map and a 
> reduce, finally collect results.
> The application will be finished in 20 seconds if we set spark.locality.wait 
> to a large value (30 minutes). And it will use 100 seconds, if we set 
> spark.locality.wait a small value(less than 10 seconds)
> We have analysed the driver log and found lot of NODE_LOCAL and RACK_LOCAL 
> level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but 
> NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds.
>
> So I think we'd better set spark.locality.wait to a large value(30 minutes), 
> until we meet this problem:
>
> Now our application will load data from hdfs in the same spark cluster, it 
> will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if the 
> tasks in loading stage have same locality level, ether NODE_LOCAL or 
> RACK_LOCAL it works fine.
> But if the tasks in loading stage get mixed locality level, such as 3 
> NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of loading 
> stage will submit the 3 NODE_LOCAL tasks as soon as resources were offered, 
> then wait for spark.locality.wait.node, which was setted to 30 minutes, the 2 
> RACK_LOCAL tasks will wait 30 minutes even though resources are avaliable.
>
>
> Does any one have met this problem? Do you have a nice solution?
>
>
> Thanks
>
>
>
>
> Ma chong

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[jira] [Commented] (SPARK-4030) `destroy` method in Broadcast should be public

2014-10-21 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14178162#comment-14178162
 ] 

Mridul Muralidharan commented on SPARK-4030:


We have also needed to use destroy - and private [spark] prevented that : 
having said that, I do agree that making it public might cause more subtle 
issues than solve them ... and probably is not a very good api change itself 
(similar to clean in direct buffers for example).
We worked around it by a shim in spark namespace in our code to workaround the 
issue - powerusers who are very clear about the tradeoff can do that - while 
most users are better off not using it directly.

> `destroy` method in Broadcast should be public
> --
>
> Key: SPARK-4030
> URL: https://issues.apache.org/jira/browse/SPARK-4030
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Spark Core
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Shivaram Venkataraman
>
> The destroy method in Broadcast.scala 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala#L91]
>  is right now marked as private[spark]
> This prevents long-running applications from cleaning up memory used by 
> broadcast variables on the driver.  Also as broadcast variables are always 
> created with persistence MEMORY_DISK, this slows down jobs when old broadcast 
> variables are flushed to disk. 
> Making `destroy` public can help applications control the lifetime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-16 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173489#comment-14173489
 ] 

Mridul Muralidharan edited comment on SPARK-3948 at 10/16/14 7:39 AM:
--

Damn, this sucks : the transferTo is not using the position of the channel to 
write output to ... while it is doing so when append is true (which is 
effectively setting position to end of file on call to getChannel).
The state of the channel, based on what we see above, is the same in both cases 
- since we can see the position is updated - and is persisted and returned when 
we call getChannel in next invocation of copyStreams. Not sure if sync() will 
help ...
So there is some other set of issues at play which we might not be able to 
workaround from the jvm.


Given this, I think we should 
a) add a logError when initialPosition == finalPosition when inChannel.size > 0 
asking users to upgrade to a newer linux kernal
b) ofcourse use append = true : to workaround immediate issues.

(a) will ensure that developers and users/admins will be notified of issues in 
case other codepaths (currently or in future) hit the same issue.




was (Author: mridulm80):

Damn, this sucks : the transferTo is not using the position of the channel to 
write output to ... while it is doing so when append is true (which is 
effectively setting position to end of file on call to getChannel).
The state of the channel, based on what we see above, is the same in both cases 
- since we can see the position is updated - and is persisted and returned when 
we call getChannel in next invocation of copyStreams.
So there is some other set of issues at play which we might not be able to 
workaround from the jvm.


Given this, I think we should 
a) add a logError when initialPosition == finalPosition when inChannel.size > 0 
asking users to upgrade to a newer linux kernal
b) ofcourse use append = true : to workaround immediate issues.

(a) will ensure that developers and users/admins will be notified of issues in 
case other codepaths (currently or in future) hit the same issue.



> Sort-based shuffle can lead to assorted stream-corruption exceptions
> 
>
> Key: SPARK-3948
> URL: https://issues.apache.org/jira/browse/SPARK-3948
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 2

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-16 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173489#comment-14173489
 ] 

Mridul Muralidharan commented on SPARK-3948:



Damn, this sucks : the transferTo is not using the position of the channel to 
write output to ... while it is doing so when append is true (which is 
effectively setting position to end of file on call to getChannel).
The state of the channel, based on what we see above, is the same in both cases 
- since we can see the position is updated - and is persisted and returned when 
we call getChannel in next invocation of copyStreams.
So there is some other set of issues at play which we might not be able to 
workaround from the jvm.


Given this, I think we should 
a) add a logError when initialPosition == finalPosition when inChannel.size > 0 
asking users to upgrade to a newer linux kernal
b) ofcourse use append = true : to workaround immediate issues.

(a) will ensure that developers and users/admins will be notified of issues in 
case other codepaths (currently or in future) hit the same issue.



> Sort-based shuffle can lead to assorted stream-corruption exceptions
> 
>
> Key: SPARK-3948
> URL: https://issues.apache.org/jira/browse/SPARK-3948
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 246825 255888 235937 236956 233099 264508 
> 260303 233294 239061 254856 257475 230105 246553 260412 210355 
> 211201 219572 206636 226866 209937 226618 218208 206255 248069 
> 221717 222112 215734 248088 239207 246125 239056 241133 253091 
> 246738 233128 242794 231606 255737 221123 252115 247286 229688 
> 251087 250047 237579 263079 256251 238214 208641 201120 204009 
> 200825 211965 200600 194492 226471 194887 226975 215072 206008 
> 233288 222132 208860 219064 218162 237126 220465 201343 225711 
> 232178 233786 212767 211462 213671 215853 227822 233782 214727 
> 247001 228968 247413 222674 214241 184122 215643 207665 219079 
> 215185 207718 212723 201613 216600 212591 208174 204195 208099 
> 229079 230274 223373 214999 256626 228895 231821 383405 229646 
> 220212 245495 245960 227556 213266 237203 203805 240509 239306 
> 242365 218416 238487 219397 240026 251011 258369 255365 259811 
> 283313 248450 264286 264562 257485 279459 249187 257609 274964 
> 292369 27382

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173413#comment-14173413
 ] 

Mridul Muralidharan commented on SPARK-3948:


Not exactly, what I was suggesting was :

a) At begining of the transferTo method, add
{code: java}
  val initialPos = channel.position()
{code}
b) At bottom of the transferTo method, before returning size, add
{code: java}
  val finalPos = channel.position()
  if (finalPos == initialPos) {
logWarning("Hit kernal bug, upgrade kernal. Attempting workaround")
channel.position(initialPos + size)
  } else {
assert(finalPos == initialPos + size)
  }
{code}


What I understand from the javadoc, this should alleviate the problem : 
ofcourse, will need verification on the setup you have where it is currently 
failing !
Note that the reason I would prefer this to append is for simple reason : the 
method is generic method to copy streams - and it might be used (currnetly, or 
in future) in scenarios where append is not true. So would be good to be 
defensive about the final state.

> Sort-based shuffle can lead to assorted stream-corruption exceptions
> 
>
> Key: SPARK-3948
> URL: https://issues.apache.org/jira/browse/SPARK-3948
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 246825 255888 235937 236956 233099 264508 
> 260303 233294 239061 254856 257475 230105 246553 260412 210355 
> 211201 219572 206636 226866 209937 226618 218208 206255 248069 
> 221717 222112 215734 248088 239207 246125 239056 241133 253091 
> 246738 233128 242794 231606 255737 221123 252115 247286 229688 
> 251087 250047 237579 263079 256251 238214 208641 201120 204009 
> 200825 211965 200600 194492 226471 194887 226975 215072 206008 
> 233288 222132 208860 219064 218162 237126 220465 201343 225711 
> 232178 233786 212767 211462 213671 215853 227822 233782 214727 
> 247001 228968 247413 222674 214241 184122 215643 207665 219079 
> 215185 207718 212723 201613 216600 212591 208174 204195 208099 
> 229079 230274 223373 214999 256626 228895 231821 383405 229646 
> 220212 245495 245960 227556 213266 237203 203805 240509 239306 
> 242365 218416 238487 219397 240026 251011 258369 255365 259811 
> 283313 248450 264286 264562 257485 279459 249187 257609 274964 
> 292369 273826
> {nof

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14172585#comment-14172585
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~jerryshao] great work !
I agree, append might be a workaround to consider (given the semantics of 
getChannel when stream is opened with append).
On other hand, since this piece of code might be used in general context also 
(the copyStreams) - what about logging a warning in case position != 
initialPosition + size at end of the transferTo loop ? Warning users that they 
should upgrade kernal ? (and explicitly modifying position as workaround)



> Sort-based shuffle can lead to assorted stream-corruption exceptions
> 
>
> Key: SPARK-3948
> URL: https://issues.apache.org/jira/browse/SPARK-3948
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 246825 255888 235937 236956 233099 264508 
> 260303 233294 239061 254856 257475 230105 246553 260412 210355 
> 211201 219572 206636 226866 209937 226618 218208 206255 248069 
> 221717 222112 215734 248088 239207 246125 239056 241133 253091 
> 246738 233128 242794 231606 255737 221123 252115 247286 229688 
> 251087 250047 237579 263079 256251 238214 208641 201120 204009 
> 200825 211965 200600 194492 226471 194887 226975 215072 206008 
> 233288 222132 208860 219064 218162 237126 220465 201343 225711 
> 232178 233786 212767 211462 213671 215853 227822 233782 214727 
> 247001 228968 247413 222674 214241 184122 215643 207665 219079 
> 215185 207718 212723 201613 216600 212591 208174 204195 208099 
> 229079 230274 223373 214999 256626 228895 231821 383405 229646 
> 220212 245495 245960 227556 213266 237203 203805 240509 239306 
> 242365 218416 238487 219397 240026 251011 258369 255365 259811 
> 283313 248450 264286 264562 257485 279459 249187 257609 274964 
> 292369 273826
> {noformat}
> Here I dump the file name, length and each partition's length, obviously the 
> sum of all partition lengths is not equal to file length. So I think there 
> may be a situation paritionWriter in ExternalSorter not always append to the 
> end of previous written file, the file's content is overwritten in some 
> parts, and this lead to the exceptions I mentioned before.
> Also I changed the code of copyStream by 

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14172226#comment-14172226
 ] 

Mridul Muralidharan commented on SPARK-3948:


Note, "t" is just some file with a few strings in it - simply generate 
something locally.

> Sort-based shuffle can lead to assorted stream-corruption exceptions
> 
>
> Key: SPARK-3948
> URL: https://issues.apache.org/jira/browse/SPARK-3948
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 246825 255888 235937 236956 233099 264508 
> 260303 233294 239061 254856 257475 230105 246553 260412 210355 
> 211201 219572 206636 226866 209937 226618 218208 206255 248069 
> 221717 222112 215734 248088 239207 246125 239056 241133 253091 
> 246738 233128 242794 231606 255737 221123 252115 247286 229688 
> 251087 250047 237579 263079 256251 238214 208641 201120 204009 
> 200825 211965 200600 194492 226471 194887 226975 215072 206008 
> 233288 222132 208860 219064 218162 237126 220465 201343 225711 
> 232178 233786 212767 211462 213671 215853 227822 233782 214727 
> 247001 228968 247413 222674 214241 184122 215643 207665 219079 
> 215185 207718 212723 201613 216600 212591 208174 204195 208099 
> 229079 230274 223373 214999 256626 228895 231821 383405 229646 
> 220212 245495 245960 227556 213266 237203 203805 240509 239306 
> 242365 218416 238487 219397 240026 251011 258369 255365 259811 
> 283313 248450 264286 264562 257485 279459 249187 257609 274964 
> 292369 273826
> {noformat}
> Here I dump the file name, length and each partition's length, obviously the 
> sum of all partition lengths is not equal to file length. So I think there 
> may be a situation paritionWriter in ExternalSorter not always append to the 
> end of previous written file, the file's content is overwritten in some 
> parts, and this lead to the exceptions I mentioned before.
> Also I changed the code of copyStream by disable transferTo, use the previous 
> one, all the issues are gone. So I think there maybe some flushing problems 
> in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14172225#comment-14172225
 ] 

Mridul Muralidharan commented on SPARK-3948:


That is weird, I tried a stripped down version to test just this - and it seems 
to be working fine.

In scala interpreter, this seems to work as expected.
{code:java}

import java.io._
import java.nio.ByteBuffer
import java.nio._
import java.nio.channels._
  def copyStream(in: InputStream,
 out: OutputStream,
 closeStreams: Boolean = false): Long =
  {
var count = 0L
try {
  if (in.isInstanceOf[FileInputStream] && 
out.isInstanceOf[FileOutputStream]) {
// When both streams are File stream, use transferTo to improve copy 
performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
println("size = " + outChannel.size)
println("position = " + outChannel.position)
val size = inChannel.size()
// In case transferTo method transferred less data than we have 
required.
while (count < size) {
  count += inChannel.transferTo(count, size - count, outChannel)
}
  } else {
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
  n = in.read(buf)
  if (n != -1) {
out.write(buf, 0, n)
count += n
  }
}
}
  count
   } finally {
  if (closeStreams) {
try {
  in.close()
} finally {
  out.close()
}
  }
}
  }
val out = new FileOutputStream("output")
for (i <- 0 until 10) {
val in = new FileInputStream("t")
 val size = copyStream(in, out, false)
println("size = " + size + " for i = " + i)
in.close()
}
out.close()

{code}


Scenarios tried :
a) No "output" file.
b) Empty "output" file.
c) Non empty "output" file.

And it seemed to work fine (and as expected) for all the cases.
Can you try this at your end ? I want to eliminate any potential environment 
issues.
I tried this with 1.7.0_55 and 1.8.0_25 ...

> Sort-based shuffle can lead to assorted stream-corruption exceptions
> 
>
> Key: SPARK-3948
> URL: https://issues.apache.org/jira/browse/SPARK-3948
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14172134#comment-14172134
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~jerryshao] Just to clarify, what exactly is the behavior you are observing ?
- Is it that getChannel is returning a channel which has position == 0 after 
writing bytes to the stream ? (size > 0)
If yes, what is the channel's length you are observing in that case ?

Also, how "large" are the file sizes ?

The documentation of getChannel and transferTo are fairly unambigous ... so our 
code, as written, is conforment to that. Ofcourse, it is always possible we are 
hitting some bugs in some scenarios !
What is the environment you are running this on btw ? OS/jvm version ? Thanks.


> Sort-based shuffle can lead to assorted stream-corruption exceptions
> 
>
> Key: SPARK-3948
> URL: https://issues.apache.org/jira/browse/SPARK-3948
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 246825 255888 235937 236956 233099 264508 
> 260303 233294 239061 254856 257475 230105 246553 260412 210355 
> 211201 219572 206636 226866 209937 226618 218208 206255 248069 
> 221717 222112 215734 248088 239207 246125 239056 241133 253091 
> 246738 233128 242794 231606 255737 221123 252115 247286 229688 
> 251087 250047 237579 263079 256251 238214 208641 201120 204009 
> 200825 211965 200600 194492 226471 194887 226975 215072 206008 
> 233288 222132 208860 219064 218162 237126 220465 201343 225711 
> 232178 233786 212767 211462 213671 215853 227822 233782 214727 
> 247001 228968 247413 222674 214241 184122 215643 207665 219079 
> 215185 207718 212723 201613 216600 212591 208174 204195 208099 
> 229079 230274 223373 214999 256626 228895 231821 383405 229646 
> 220212 245495 245960 227556 213266 237203 203805 240509 239306 
> 242365 218416 238487 219397 240026 251011 258369 255365 259811 
> 283313 248450 264286 264562 257485 279459 249187 257609 274964 
> 292369 273826
> {noformat}
> Here I dump the file name, length and each partition's length, obviously the 
> sum of all partition lengths is not equal to file length. So I think there 
> may be a situation paritionWriter in ExternalSorter not always append to the 
> end of previous written file, the f

[jira] [Commented] (SPARK-3948) Sort-based shuffle can lead to assorted stream-corruption exceptions

2014-10-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14172116#comment-14172116
 ] 

Mridul Muralidharan commented on SPARK-3948:


[~joshrosen] Assuming there are no VM bugs being hit for inChannel.size() or 
some other concurrent writes to these files, I dont see any issues with the 
code - as you elaborated.
On other hand, external sort code is slightly loose w.r.t use of file api - not 
sure if that is causing the observed problems : example, use of skip() in 
SortShuffleManager.scala.
We will need to investigate in detail if some of these are causing the observed 
problems.

> Sort-based shuffle can lead to assorted stream-corruption exceptions
> 
>
> Key: SPARK-3948
> URL: https://issues.apache.org/jira/browse/SPARK-3948
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 246825 255888 235937 236956 233099 264508 
> 260303 233294 239061 254856 257475 230105 246553 260412 210355 
> 211201 219572 206636 226866 209937 226618 218208 206255 248069 
> 221717 222112 215734 248088 239207 246125 239056 241133 253091 
> 246738 233128 242794 231606 255737 221123 252115 247286 229688 
> 251087 250047 237579 263079 256251 238214 208641 201120 204009 
> 200825 211965 200600 194492 226471 194887 226975 215072 206008 
> 233288 222132 208860 219064 218162 237126 220465 201343 225711 
> 232178 233786 212767 211462 213671 215853 227822 233782 214727 
> 247001 228968 247413 222674 214241 184122 215643 207665 219079 
> 215185 207718 212723 201613 216600 212591 208174 204195 208099 
> 229079 230274 223373 214999 256626 228895 231821 383405 229646 
> 220212 245495 245960 227556 213266 237203 203805 240509 239306 
> 242365 218416 238487 219397 240026 251011 258369 255365 259811 
> 283313 248450 264286 264562 257485 279459 249187 257609 274964 
> 292369 273826
> {noformat}
> Here I dump the file name, length and each partition's length, obviously the 
> sum of all partition lengths is not equal to file length. So I think there 
> may be a situation paritionWriter in ExternalSorter not always append to the 
> end of previous written file, the file's content is overwritten in some 
> parts, and this lead to the exceptions I mentioned before.
> Also I changed the code of copyStream by 

[jira] [Commented] (SPARK-3889) JVM dies with SIGBUS, resulting in ConnectionManager failed ACK

2014-10-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167303#comment-14167303
 ] 

Mridul Muralidharan commented on SPARK-3889:


The status says fixed - what was done to resolve this ? I did not see a PR ...

> JVM dies with SIGBUS, resulting in ConnectionManager failed ACK
> ---
>
> Key: SPARK-3889
> URL: https://issues.apache.org/jira/browse/SPARK-3889
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>Priority: Critical
> Fix For: 1.2.0
>
>
> Here's the first part of the core dump, possibly caused by a job which 
> shuffles a lot of very small partitions.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGBUS (0x7) at pc=0x7fa5885fcdb0, pid=488, tid=140343502632704
> #
> # JRE version: 7.0_25-b30
> # Java VM: OpenJDK 64-Bit Server VM (23.7-b01 mixed mode linux-amd64 
> compressed oops)
> # Problematic frame:
> # v  ~StubRoutines::jbyte_disjoint_arraycopy
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please include
> # instructions on how to reproduce the bug and visit:
> #   https://bugs.launchpad.net/ubuntu/+source/openjdk-7/
> #
> ---  T H R E A D  ---
> Current thread (0x7fa4b0631000):  JavaThread "Executor task launch 
> worker-170" daemon [_thread_in_Java, id=6783, 
> stack(0x7fa4448ef000,0x7fa4449f)]
> siginfo:si_signo=SIGBUS: si_errno=0, si_code=2 (BUS_ADRERR), 
> si_addr=0x7fa428f79000
> {code}
> Here is the only useful content I can find related to JVM and SIGBUS from 
> Google: https://bugzilla.redhat.com/show_bug.cgi?format=multiple&id=976664
> It appears it may be related to disposing byte buffers, which we do in the 
> ConnectionManager -- we mmap shuffle files via ManagedBuffer and dispose of 
> them in BufferMessage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Mridul Muralidharan
Brilliant stuff ! Congrats all :-)
This is indeed really heartening news !

Regards,
Mridul


On Fri, Oct 10, 2014 at 8:24 PM, Matei Zaharia  wrote:
> Hi folks,
>
> I interrupt your regularly scheduled user / dev list to bring you some pretty 
> cool news for the project, which is that we've been able to use Spark to 
> break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x 
> fewer nodes. There's a detailed writeup at 
> http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
>  Summary: while Hadoop MapReduce held last year's 100 TB world record by 
> sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 
> nodes; and we also scaled up to sort 1 PB in 234 minutes.
>
> I want to thank Reynold Xin for leading this effort over the past few weeks, 
> along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In 
> addition, we'd really like to thank Amazon's EC2 team for providing the 
> machines to make this possible. Finally, this result would of course not be 
> possible without the many many other contributions, testing and feature 
> requests from throughout the community.
>
> For an engine to scale from these multi-hour petabyte batch jobs down to 
> 100-millisecond streaming and interactive queries is quite uncommon, and it's 
> thanks to all of you folks that we are able to make this happen.
>
> Matei
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Breaking the previous large-scale sort record with Spark

2014-10-10 Thread Mridul Muralidharan
Brilliant stuff ! Congrats all :-)
This is indeed really heartening news !

Regards,
Mridul


On Fri, Oct 10, 2014 at 8:24 PM, Matei Zaharia  wrote:
> Hi folks,
>
> I interrupt your regularly scheduled user / dev list to bring you some pretty 
> cool news for the project, which is that we've been able to use Spark to 
> break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x 
> fewer nodes. There's a detailed writeup at 
> http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
>  Summary: while Hadoop MapReduce held last year's 100 TB world record by 
> sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 
> nodes; and we also scaled up to sort 1 PB in 234 minutes.
>
> I want to thank Reynold Xin for leading this effort over the past few weeks, 
> along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In 
> addition, we'd really like to thank Amazon's EC2 team for providing the 
> machines to make this possible. Finally, this result would of course not be 
> possible without the many many other contributions, testing and feature 
> requests from throughout the community.
>
> For an engine to scale from these multi-hour petabyte batch jobs down to 
> 100-millisecond streaming and interactive queries is quite uncommon, and it's 
> thanks to all of you folks that we are able to make this happen.
>
> Matei
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[jira] [Commented] (SPARK-3847) Enum.hashCode is only consistent within the same JVM

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14164775#comment-14164775
 ] 

Mridul Muralidharan commented on SPARK-3847:


[~joshrosen] array hashcode might be understandable - but enum's messing 
hashcode up is unexpected to say the least :-)
You are right, we need to add something similar.

Maybe have a blacklist of key types which are unstable to be used in 
distributed setting - I have a feeling we might end up with a longer list than 
just array and enum.

> Enum.hashCode is only consistent within the same JVM
> 
>
> Key: SPARK-3847
> URL: https://issues.apache.org/jira/browse/SPARK-3847
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: Oracle JDK 7u51 64bit on Ubuntu 12.04
>Reporter: Nathan Bijnens
>  Labels: enum
>
> When using java Enum's as key in some operations the results will be very 
> unexpected. The issue is that the Java Enum.hashCode returns the 
> memoryposition, which is different on each JVM. 
> {code}
> messages.filter(_.getHeader.getKind == Kind.EVENT).count
> >> 503650
> val tmp = messages.filter(_.getHeader.getKind == Kind.EVENT)
> tmp.map(_.getHeader.getKind).countByValue
> >> Map(EVENT -> 1389)
> {code}
> Because it's actually a JVM issue we either should reject with an error enums 
> as key or implement a workaround.
> A good writeup of the issue can be found here (and a workaround):
> http://dev.bizo.com/2014/02/beware-enums-in-spark.html
> Somewhat more on the hash codes and Enum's:
> https://stackoverflow.com/questions/4885095/what-is-the-reason-behind-enum-hashcode
> And some issues (most of them rejected) at the Oracle Bug Java database:
> - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8050217
> - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7190798



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14164095#comment-14164095
 ] 

Mridul Muralidharan commented on SPARK-3561:



I agree with [~pwendell] that it does not help spark to introduce dependency of 
tez on core.
[~ozhurakousky] is tez available on all yarn clusters ? Or is it an additional 
runtime dependency ?

If it is available by default - we can make it a runtime switch to use tez for 
jobs running on yarn-standalone and yarn-client mode.
But before that ...


While better multi-tennancy would be a likely benefit - my specific interest in 
this patch is more to do with the much better shuffle performance that tez 
offers :-) Specicially for ETL jobs, I can see other benefits which might be 
relevant - one of our collaborative filtering implementation, though not ETL, 
comes fairly close to it in job characterstics and suffers due to some of our 
shuffle issues ...


As I alluded to, I do not think we should have an openended extension point - 
where any class name can be provided which extends functionality in arbitrary 
manner - for example, like the spi we have for compression codecs.
As Patrick mentioned, this gives the impression that the approach is blessed by 
spark developers - even if tagged with Experimental.
Particularly with core internals, I would be very wary of exposing them via an 
spi - simply because we need the freedom to evolve them for performance or 
functionality reasons.


On other hand, I am in favour of exploring this option to see what sort of 
benefits we get out of this assuming it has been prototyped already - which I 
thought was the case here, though I am yet to see a PR with that (not sure if I 
missed it !).
Given that Tez is supposed to be reasonably mature - if there is a spark + tez 
version, I want to see what benefits (if any) are observed as a result of this 
effort.
I had discussed spark + tez integration about an year or so back with Matei - 
but at that time, tez was probably not that mature - maybe this is a better 
time !

[~ozhurakousky] Do you have a spark on tez prototype done already ? Or is this 
an experiment you are yet to complete ? If complete, what sort of performance 
difference do you see ? What metrics are you using ?


If there are significant benefits, I would want to take a closer look at the 
final proposed patch ... I would be interested in it making into spark in some 
form.

As [~nchammas] mentioned - if it is possible to address it in spark directly, 
nothing like it - particularly since it will benefit all modes of execution and 
not just yarn + tez combination.
If the gap cant be narrowed, and the benefits are significant (for some, as of 
now underfined, definition of "benefits" and "significant") - then we can 
consider tez dependency in yarn module.

Ofcourse, all these questions are moot - until we have better quantitative 
judgement of what the expected gains are and what the experimental results are.

> Allow for pluggable execution contexts in Spark
> ---
>
> Key: SPARK-3561
> URL: https://issues.apache.org/jira/browse/SPARK-3561
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Oleg Zhurakousky
>  Labels: features
> Fix For: 1.2.0
>
> Attachments: SPARK-3561.pdf
>
>
> Currently Spark provides integration with external resource-managers such as 
> Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the 
> current architecture of Spark-on-YARN can be enhanced to provide 
> significantly better utilization of cluster resources for large scale, batch 
> and/or ETL applications when run alongside other applications (Spark and 
> others) and services in YARN. 
> Proposal: 
> The proposed approach would introduce a pluggable JobExecutionContext (trait) 
> - a gateway and a delegate to Hadoop execution environment - as a non-public 
> api (@DeveloperAPI) not exposed to end users of Spark. 
> The trait will define 4 only operations: 
> * hadoopFile 
> * newAPIHadoopFile 
> * broadcast 
> * runJob 
> Each method directly maps to the corresponding methods in current version of 
> SparkContext. JobExecutionContext implementation will be accessed by 
> SparkContext via master URL as 
> "execution-context:foo.bar.MyJobExecutionContext" with default implementation 
> containing the existing code from SparkContext, thus allowing current 
> (corresponding) methods of SparkContext to delegate to such implementation. 
> An integrator will now have an option to provide custom implementation of 
> DefaultExecutionContext by either 

[jira] [Commented] (SPARK-3847) Enum.hashCode is only consistent within the same JVM

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14163407#comment-14163407
 ] 

Mridul Muralidharan commented on SPARK-3847:


Wow, nice bug ! This is unexpected - thanks for reporting this.

> Enum.hashCode is only consistent within the same JVM
> 
>
> Key: SPARK-3847
> URL: https://issues.apache.org/jira/browse/SPARK-3847
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: Oracle JDK 7u51 64bit on Ubuntu 12.04
>Reporter: Nathan Bijnens
>  Labels: enum
>
> When using java Enum's as key in some operations the results will be very 
> unexpected. The issue is that the Java Enum.hashCode returns the 
> memoryposition, which is different on each JVM. 
> {code}
> messages.filter(_.getHeader.getKind == Kind.EVENT).count
> >> 503650
> val tmp = messages.filter(_.getHeader.getKind == Kind.EVENT)
> tmp.map(_.getHeader.getKind).countByValue
> >> Map(EVENT -> 1389)
> {code}
> Because it's actually a JVM issue we either should reject with an error enums 
> as key or implement a workaround.
> A good writeup of the issue can be found here (and a workaround):
> http://dev.bizo.com/2014/02/beware-enums-in-spark.html
> Somewhat more on the hash codes and Enum's:
> https://stackoverflow.com/questions/4885095/what-is-the-reason-behind-enum-hashcode
> And some issues (most of them rejected) at the Oracle Bug Java database:
> - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8050217
> - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7190798



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14163376#comment-14163376
 ] 

Mridul Muralidharan commented on SPARK-3561:


[~ozhurakousky] I think the disconnect here is that the interfaces proposed do 
not show much value in what is supposed to be the functionality to be exposed 
to the users. A followup pr showing how this interfaces are used in context of 
Tez would show value in why this change is relevant in context of spark.

The disconnect, if I am not wrong, is that we do not want to expose spi's which 
we would then need to maintain in spark core - while unknown implementations 
extend it in non standard ways causing issues to our end users.


For example, even though TaskScheduler is an spi and can in theory be extended 
in arbitrary ways - all the spi implementations currently 'live' within spark 
and are in harmony with rest of the code - and changes which occur within spark 
core (when functionality is added or extended).
This allows us to decouple the actual TaskScheduler implementation from spark 
code, while still keeping them in sync and maintainable while adding 
functionality independent of other pieces : case in point, yarn support has 
significantly evolved from when I initially added it - to the point where it 
probably does not share even a single line of code I initially wrote :) - and 
yet this has been done pretty much independent of changes to core while at the 
same time ensuring that it is compatible with changes in spark core and vice 
versa.


The next step, imo, would be a PR which shows how these interfaces are used for 
non trivial usecase : Tez in this case.
The default implementation provided in the pr can be removed (since it should 
not be used/exposed to users).

Once that is done, we can evaluate the interface proposed in context of the 
functionality exposed, and see how it fits in context of rest of spark.

> Allow for pluggable execution contexts in Spark
> ---
>
> Key: SPARK-3561
> URL: https://issues.apache.org/jira/browse/SPARK-3561
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Oleg Zhurakousky
>  Labels: features
> Fix For: 1.2.0
>
> Attachments: SPARK-3561.pdf
>
>
> Currently Spark provides integration with external resource-managers such as 
> Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the 
> current architecture of Spark-on-YARN can be enhanced to provide 
> significantly better utilization of cluster resources for large scale, batch 
> and/or ETL applications when run alongside other applications (Spark and 
> others) and services in YARN. 
> Proposal: 
> The proposed approach would introduce a pluggable JobExecutionContext (trait) 
> - a gateway and a delegate to Hadoop execution environment - as a non-public 
> api (@DeveloperAPI) not exposed to end users of Spark. 
> The trait will define 4 only operations: 
> * hadoopFile 
> * newAPIHadoopFile 
> * broadcast 
> * runJob 
> Each method directly maps to the corresponding methods in current version of 
> SparkContext. JobExecutionContext implementation will be accessed by 
> SparkContext via master URL as 
> "execution-context:foo.bar.MyJobExecutionContext" with default implementation 
> containing the existing code from SparkContext, thus allowing current 
> (corresponding) methods of SparkContext to delegate to such implementation. 
> An integrator will now have an option to provide custom implementation of 
> DefaultExecutionContext by either implementing it from scratch or extending 
> form DefaultExecutionContext. 
> Please see the attached design doc for more details. 
> Pull Request will be posted shortly as well



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14163351#comment-14163351
 ] 

Mridul Muralidharan commented on SPARK-3561:


[~pwendell] If I understood the proposal and the initial pr submitted - the 
intent of this JIRA, as initally proposed by [~ozhurakousky] is fairly 
different from the other efforts referenced if I am not wrong.
The focus of this change seems to be to completely bypass spark execution 
engine and substitute an alternative : so only the current api (and so dag 
creation from the spark program) and user interfaces in spark remain - the 
block management, execution engine, execution state management, etc would all 
be replaced under the covers by what Tez (or something else in future) provides.

If I am not wrong the changes would be :
a) Applies only to yarn mode - when specified execution environment can be run.
b) the current spark AM would no longer request for any executors.
c) spark block manager would no longer be required (other than possibly for 
hosting broadcast via http i guess ?).
d) the actual DAG execution would be taken up by the overridden execution 
engine - spark's Task manager and DAG scheduler are noop's.

I might be missing things which Oleg can elaborate on.


This functionality, IMO, is fundamentally different from what is being explored 
in the other jira's - and so has value to be pursued independent of the other 
efforts.
Obviously this does not work in all usecases where spark is run on - but 
handles a subset of usecases where other execution engines might do much better 
than spark currently does - simply because of better code maturity and 
specialized usecases they target.


> Allow for pluggable execution contexts in Spark
> ---
>
> Key: SPARK-3561
> URL: https://issues.apache.org/jira/browse/SPARK-3561
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Oleg Zhurakousky
>  Labels: features
> Fix For: 1.2.0
>
> Attachments: SPARK-3561.pdf
>
>
> Currently Spark provides integration with external resource-managers such as 
> Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the 
> current architecture of Spark-on-YARN can be enhanced to provide 
> significantly better utilization of cluster resources for large scale, batch 
> and/or ETL applications when run alongside other applications (Spark and 
> others) and services in YARN. 
> Proposal: 
> The proposed approach would introduce a pluggable JobExecutionContext (trait) 
> - a gateway and a delegate to Hadoop execution environment - as a non-public 
> api (@DeveloperAPI) not exposed to end users of Spark. 
> The trait will define 4 only operations: 
> * hadoopFile 
> * newAPIHadoopFile 
> * broadcast 
> * runJob 
> Each method directly maps to the corresponding methods in current version of 
> SparkContext. JobExecutionContext implementation will be accessed by 
> SparkContext via master URL as 
> "execution-context:foo.bar.MyJobExecutionContext" with default implementation 
> containing the existing code from SparkContext, thus allowing current 
> (corresponding) methods of SparkContext to delegate to such implementation. 
> An integrator will now have an option to provide custom implementation of 
> DefaultExecutionContext by either implementing it from scratch or extending 
> form DefaultExecutionContext. 
> Please see the attached design doc for more details. 
> Pull Request will be posted shortly as well



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3785) Support off-loading computations to a GPU

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14163326#comment-14163326
 ] 

Mridul Muralidharan commented on SPARK-3785:


[~sowen] We had prototyped a solution for doing just this - the way we did it 
was to add a new StorageLevel - which was "higher" than ProcessLevel - and 
maintain information about which card hosted the data in an executor, and other 
handles/metadata required to offload computation to the accelerator card (so 
not just gpu).
And we set appropriate level delays so that in case the rdd had gpu data, no 
other computation level is allowed (unless there is a loss of executor).

It was a prototype - so we did not solve all issues which arise - including how 
to expose eviction of data from gpu back to main memory/disk in case of memory 
pressure, more efficient failure modes, moving data between gpu's to balance 
the memory and computational load/rdd replication between gpus in an executor, 
multi-tennancy, etc : our initial target usecases just required running various 
(expensive) closures on the data and the result was to be pulled off the card 
(which was fairly 'small') - so not all of these needed to be solved alteast 
for the prototype :-)
I cant get into the gory details or the actual benchmark numbers though, 
apologies.

In general, is it worth it ? For very specific cases, I would say it is a 
phenomenal - allowing 2 - 3 orders of performance boost vertically !
But for cases where it is not a good fit, it is terrible - even for cases where 
it was intutively supposed to work, inefficiencies we incur make it terrible at 
times.

> Support off-loading computations to a GPU
> -
>
> Key: SPARK-3785
> URL: https://issues.apache.org/jira/browse/SPARK-3785
> Project: Spark
>  Issue Type: Brainstorming
>  Components: MLlib
>Reporter: Thomas Darimont
>Priority: Minor
>
> Are there any plans to adding support for off-loading computations to the 
> GPU, e.g. via an open-cl binding? 
> http://www.jocl.org/
> https://code.google.com/p/javacl/
> http://lwjgl.org/wiki/index.php?title=OpenCL_in_LWJGL



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3714) Spark workflow scheduler

2014-09-29 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151486#comment-14151486
 ] 

Mridul Muralidharan commented on SPARK-3714:



Most of the drawbacks mentioned are not severe imo - at best, they are 
unfamiliarity with oozie platform (points 2, 3, 4, 5).
Point 1 is interesting (sharing spark context) - though from a fault tolerance 
point of view, it makes supporting it challenging; ofcourse oozie was not, 
probably, designed with something like spark in mind - so there might be 
changes to oozie which might benefit spark; we could engage with oozie dev for 
that.

But discarding it to reinvent something when oozie already does everything 
mentioned in requirements section seems counterintutive.


I have seen multiple attempts to 'simplify' workflow management, and at 
production scale almost everything ends up being similar ...
Note that most production jobs have to depend on a variety of jobs - not just 
spark or MR - so you will end up converigng on a variant of oozie anyway :-)

Having said that, if you want to take a crack at solving this with spark 
specific idioms in mind, it would be interesting to see the result - I dont 
want to dissuade from doing so !
We might end up with something quite interesting.

> Spark workflow scheduler
> 
>
> Key: SPARK-3714
> URL: https://issues.apache.org/jira/browse/SPARK-3714
> Project: Spark
>  Issue Type: New Feature
>  Components: Project Infra
>Reporter: Egor Pakhomov
>Priority: Minor
>
> [Design doc | 
> https://docs.google.com/document/d/1q2Q8Ux-6uAkH7wtLJpc3jz-GfrDEjlbWlXtf20hvguk/edit?usp=sharing]
> Spark stack currently hard to use in the production processes due to the lack 
> of next features:
> * Scheduling spark jobs
> * Retrying failed spark job in big pipeline
> * Share context among jobs in pipeline
> * Queue jobs
> Typical usecase for such platform would be - wait for new data, process new 
> data, learn ML models on new data, compare model with previous one, in case 
> of success - rewrite model in HDFS directory for current production model 
> with new one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3714) Spark workflow scheduler

2014-09-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151211#comment-14151211
 ] 

Mridul Muralidharan commented on SPARK-3714:


Have you tried using oozie for this ?
IIRC Tom has already gotten this working here quite a while back
/CC [~tgraves] 

> Spark workflow scheduler
> 
>
> Key: SPARK-3714
> URL: https://issues.apache.org/jira/browse/SPARK-3714
> Project: Spark
>  Issue Type: New Feature
>  Components: Project Infra
>Reporter: Egor Pakhomov
>Priority: Minor
>
> [Design doc | 
> https://docs.google.com/document/d/1q2Q8Ux-6uAkH7wtLJpc3jz-GfrDEjlbWlXtf20hvguk/edit?usp=sharing]
> Spark stack currently hard to use in the production processes due to the lack 
> of next features:
> * Scheduling spark jobs
> * Retrying failed spark job in big pipeline
> * Share context among jobs in pipeline
> * Queue jobs
> Typical usecase for such platform would be - wait for new data, process new 
> data, learn ML models on new data, compare model with previous one, in case 
> of success - rewrite model in HDFS directory for current production model 
> with new one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1956) Enable shuffle consolidation by default

2014-09-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14125027#comment-14125027
 ] 

Mridul Muralidharan commented on SPARK-1956:


The recent changes to BlockObjectWriter have introduced bugs again ... I don't 
know how badly they affect the codebase, but it would not be prudent to enable 
it by default until they are fixed and changes properly analyzed.

> Enable shuffle consolidation by default
> ---
>
> Key: SPARK-1956
> URL: https://issues.apache.org/jira/browse/SPARK-1956
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.0.0
>Reporter: Sandy Ryza
>
> The only drawbacks are on ext3, and most everyone has ext4 at this point.  I 
> think it's better to aim the default at the common case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks

2014-09-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14119385#comment-14119385
 ] 

Mridul Muralidharan commented on SPARK-1476:


WIP version pushed to https://github.com/mridulm/spark/tree/2g_fix - about 2 
weeks before feature freeze in 1.1 iirc. 

Note that the 2g fixes are functionally complete, but this branch also includes 
a large number of other fixes.
Some of these have been pushed to master; while others have not yet done : for 
alleviating memory pressure primarily, and fixing resource leaks.

This branch has been shared for reference purpose - and is not meant to be 
actively worked on for merging into master.
We will need to cherry pick the changes and do that manually.

> 2GB limit in spark for blocks
> -
>
> Key: SPARK-1476
> URL: https://issues.apache.org/jira/browse/SPARK-1476
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
> Environment: all
>    Reporter: Mridul Muralidharan
>Assignee: Mridul Muralidharan
>Priority: Critical
> Attachments: 2g_fix_proposal.pdf
>
>
> The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
> the size of the block to 2GB.
> This has implication not just for managed blocks in use, but also for shuffle 
> blocks (memory mapped blocks are limited to 2gig, even though the api allows 
> for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
> This is a severe limitation for use of spark when used on non trivial 
> datasets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-09-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14119365#comment-14119365
 ] 

Mridul Muralidharan commented on SPARK-3019:


I will try to push the version we had last worked on for 2G fix (a pre-1.1 
fork) to git later today/this week - and we can take a look at it.
It might require some effort to rebase it to 1.1 since it is slightly dated; 
but that can be done if required : the main reason for the push would be to 
illustrate the reason why the interfaces exist in SPARK-1476 and how they are 
used : so that there is a better understanding of the required functional 
change.

> Pluggable block transfer (data plane communication) interface
> -
>
> Key: SPARK-3019
> URL: https://issues.apache.org/jira/browse/SPARK-3019
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
> 1.pdf
>
>
> The attached design doc proposes a standard interface for block transferring, 
> which will make future engineering of this functionality easier, allowing the 
> Spark community to provide alternative implementations.
> Block transferring is a critical function in Spark. All of the following 
> depend on it:
> * shuffle
> * torrent broadcast
> * block replication in BlockManager
> * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-09-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14119362#comment-14119362
 ] 

Mridul Muralidharan commented on SPARK-3019:


Just went over the proposal in some detail.
[~rxin] did you take a look at the proposal in SPARK-1476 ?
The ManagedBuffer detailed in this document does not satisfy most of the 
interface or functional requirements in 1476 - which would require us to 
redesign this interface when we need to support larger than 2 GB for blocks in 
spark : unless I missed something.

> Pluggable block transfer (data plane communication) interface
> -
>
> Key: SPARK-3019
> URL: https://issues.apache.org/jira/browse/SPARK-3019
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
> 1.pdf
>
>
> The attached design doc proposes a standard interface for block transferring, 
> which will make future engineering of this functionality easier, allowing the 
> Spark community to provide alternative implementations.
> Block transferring is a critical function in Spark. All of the following 
> depend on it:
> * shuffle
> * torrent broadcast
> * block replication in BlockManager
> * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [VOTE] Release Apache Spark 1.1.0 (RC1)

2014-08-28 Thread Mridul Muralidharan
Thanks for being on top of this Patrick ! And apologies for not being able
to help more.

Regards,
Mridul
On Aug 29, 2014 1:30 AM, "Patrick Wendell"  wrote:

> Mridul - thanks for sending this along and for the debugging comments
> on the JIRA. I think we have a handle on the issue and we'll patch it
> and spin a new RC. We can also update the test coverage to cover LZ4.
>
> - Patrick
>
> On Thu, Aug 28, 2014 at 9:27 AM, Mridul Muralidharan 
> wrote:
> > Is SPARK-3277 applicable to 1.1 ?
> > If yes, until it is fixed, I am -1 on the release (I am on break, so
> can't
> > verify or help fix, sorry).
> >
> > Regards
> > Mridul
> >
> > On 28-Aug-2014 9:33 pm, "Patrick Wendell"  wrote:
> >>
> >> Please vote on releasing the following candidate as Apache Spark version
> >> 1.1.0!
> >>
> >> The tag to be voted on is v1.1.0-rc1 (commit f0718324):
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f07183249b74dd857069028bf7d570b35f265585
> >>
> >> The release files, including signatures, digests, etc. can be found at:
> >> http://people.apache.org/~pwendell/spark-1.1.0-rc1/
> >>
> >> Release artifacts are signed with the following key:
> >> https://people.apache.org/keys/committer/pwendell.asc
> >>
> >> The staging repository for this release can be found at:
> >> https://repository.apache.org/content/repositories/orgapachespark-1028/
> >>
> >> The documentation corresponding to this release can be found at:
> >> http://people.apache.org/~pwendell/spark-1.1.0-rc1-docs/
> >>
> >> Please vote on releasing this package as Apache Spark 1.1.0!
> >>
> >> The vote is open until Sunday, August 31, at 17:00 UTC and passes if
> >> a majority of at least 3 +1 PMC votes are cast.
> >>
> >> [ ] +1 Release this package as Apache Spark 1.1.0
> >> [ ] -1 Do not release this package because ...
> >>
> >> To learn more about Apache Spark, please see
> >> http://spark.apache.org/
> >>
> >> == What justifies a -1 vote for this release? ==
> >> This vote is happening very late into the QA period compared with
> >> previous votes, so -1 votes should only occur for significant
> >> regressions from 1.0.2. Bugs already present in 1.0.X will not block
> >> this release.
> >>
> >> == What default changes should I be aware of? ==
> >> 1. The default value of "spark.io.compression.codec" is now "snappy"
> >> --> Old behavior can be restored by switching to "lzf"
> >>
> >> 2. PySpark now performs external spilling during aggregations.
> >> --> Old behavior can be restored by setting "spark.shuffle.spill" to
> >> "false".
> >>
> >> I'll send a bit more later today with feature information for the
> >> release. In the mean time I want to put this out there for
> >> consideration.
> >>
> >> - Patrick
> >>
> >> -
> >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: dev-h...@spark.apache.org
> >>
> >
>


[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14114484#comment-14114484
 ] 

Mridul Muralidharan commented on SPARK-3277:


Sounds great, thx !
I suspect it is because for lzo we configure it to write block on flush 
(partial if insufficient data to fill block); but for lz4, either such config 
does not exist or we dont use that.
Resulting in flush becoming noop in case the data in current block is 
insufficientto cause a compressed block to be created - while close will force 
patial block to be written out.

Which is why the asserion lists all sizes as 0


> LZ4 compression cause the the ExternalSort exception
> 
>
> Key: SPARK-3277
> URL: https://issues.apache.org/jira/browse/SPARK-3277
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.2, 1.1.0, 1.2.0
>Reporter: hzw
>Assignee: Andrew Or
>Priority: Blocker
> Attachments: test_lz4_bug.patch
>
>
> I tested the LZ4 compression,and it come up with such problem.(with wordcount)
> Also I tested the snappy and LZF,and they were OK.
> At last I set the  "spark.shuffle.spill" as false to avoid such exeception, 
> but once open this "switch", this error would come.
> It seems that if num of the[ words is few, wordcount will go through,but if 
> it is a complex text ,this problem will show
> Exeception Info as follow:
> {code}
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:165)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:416)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14114026#comment-14114026
 ] 

Mridul Muralidharan commented on SPARK-3277:


[~hzw] did you notice this against 1.0.2 ?
I did not think the changes for consolidated shuffle were backported to that 
branch, [~mateiz] can comment more though.

> LZ4 compression cause the the ExternalSort exception
> 
>
> Key: SPARK-3277
> URL: https://issues.apache.org/jira/browse/SPARK-3277
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.2, 1.1.0, 1.2.0
>Reporter: hzw
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: test_lz4_bug.patch
>
>
> I tested the LZ4 compression,and it come up with such problem.(with wordcount)
> Also I tested the snappy and LZF,and they were OK.
> At last I set the  "spark.shuffle.spill" as false to avoid such exeception, 
> but once open this "switch", this error would come.
> It seems that if num of the words is few, wordcount will go through,but if it 
> is a complex text ,this problem will show
> Exeception Info as follow:
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:165)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:416)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14114022#comment-14114022
 ] 

Mridul Muralidharan edited comment on SPARK-3277 at 8/28/14 5:37 PM:
-

Attached patch is against master, though I noticed similar changes in 1.1 also 
: but not yet verified.


was (Author: mridulm80):
Against master, though I noticed similar changes in 1.1 also : but not yet 
verified.

> LZ4 compression cause the the ExternalSort exception
> 
>
> Key: SPARK-3277
> URL: https://issues.apache.org/jira/browse/SPARK-3277
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.2, 1.1.0, 1.2.0
>Reporter: hzw
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: test_lz4_bug.patch
>
>
> I tested the LZ4 compression,and it come up with such problem.(with wordcount)
> Also I tested the snappy and LZF,and they were OK.
> At last I set the  "spark.shuffle.spill" as false to avoid such exeception, 
> but once open this "switch", this error would come.
> It seems that if num of the words is few, wordcount will go through,but if it 
> is a complex text ,this problem will show
> Exeception Info as follow:
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:165)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:416)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-3277:
---

Attachment: test_lz4_bug.patch

Against master, though I noticed similar changes in 1.1 also : but not yet 
verified.

> LZ4 compression cause the the ExternalSort exception
> 
>
> Key: SPARK-3277
> URL: https://issues.apache.org/jira/browse/SPARK-3277
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.2, 1.1.0, 1.2.0
>Reporter: hzw
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: test_lz4_bug.patch
>
>
> I tested the LZ4 compression,and it come up with such problem.(with wordcount)
> Also I tested the snappy and LZF,and they were OK.
> At last I set the  "spark.shuffle.spill" as false to avoid such exeception, 
> but once open this "switch", this error would come.
> It seems that if num of the words is few, wordcount will go through,but if it 
> is a complex text ,this problem will show
> Exeception Info as follow:
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:165)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:416)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14114014#comment-14114014
 ] 

Mridul Muralidharan commented on SPARK-3277:


[~matei] Attaching a patch which reproduces the bug consistently.
I suspect the issue is more serious than what I detailed above - spill to disk 
seems completely broken if I understood the assertion message correctly.
Unfortunately, this is based on a few minutes of free time I could grab - so a 
more principled debugging session is definitely warranted !



> LZ4 compression cause the the ExternalSort exception
> 
>
> Key: SPARK-3277
> URL: https://issues.apache.org/jira/browse/SPARK-3277
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.2, 1.1.0, 1.2.0
>Reporter: hzw
>Priority: Blocker
> Fix For: 1.1.0
>
>
> I tested the LZ4 compression,and it come up with such problem.(with wordcount)
> Also I tested the snappy and LZF,and they were OK.
> At last I set the  "spark.shuffle.spill" as false to avoid such exeception, 
> but once open this "switch", this error would come.
> It seems that if num of the words is few, wordcount will go through,but if it 
> is a complex text ,this problem will show
> Exeception Info as follow:
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:165)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:416)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-3277:
---

Priority: Blocker  (was: Major)

> LZ4 compression cause the the ExternalSort exception
> 
>
> Key: SPARK-3277
> URL: https://issues.apache.org/jira/browse/SPARK-3277
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.2, 1.1.0, 1.2.0
>Reporter: hzw
>Priority: Blocker
> Fix For: 1.1.0
>
>
> I tested the LZ4 compression,and it come up with such problem.(with wordcount)
> Also I tested the snappy and LZF,and they were OK.
> At last I set the  "spark.shuffle.spill" as false to avoid such exeception, 
> but once open this "switch", this error would come.
> It seems that if num of the words is few, wordcount will go through,but if it 
> is a complex text ,this problem will show
> Exeception Info as follow:
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:165)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:416)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-3277:
---

Affects Version/s: 1.2.0
   1.1.0

> LZ4 compression cause the the ExternalSort exception
> 
>
> Key: SPARK-3277
> URL: https://issues.apache.org/jira/browse/SPARK-3277
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.2, 1.1.0, 1.2.0
>Reporter: hzw
>Priority: Blocker
> Fix For: 1.1.0
>
>
> I tested the LZ4 compression,and it come up with such problem.(with wordcount)
> Also I tested the snappy and LZF,and they were OK.
> At last I set the  "spark.shuffle.spill" as false to avoid such exeception, 
> but once open this "switch", this error would come.
> It seems that if num of the words is few, wordcount will go through,but if it 
> is a complex text ,this problem will show
> Exeception Info as follow:
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:165)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:416)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [VOTE] Release Apache Spark 1.1.0 (RC1)

2014-08-28 Thread Mridul Muralidharan
Is SPARK-3277 applicable to 1.1 ?
If yes, until it is fixed, I am -1 on the release (I am on break, so can't
verify or help fix, sorry).

Regards
Mridul
 On 28-Aug-2014 9:33 pm, "Patrick Wendell"  wrote:

> Please vote on releasing the following candidate as Apache Spark version
> 1.1.0!
>
> The tag to be voted on is v1.1.0-rc1 (commit f0718324):
>
> https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f07183249b74dd857069028bf7d570b35f265585
>
> The release files, including signatures, digests, etc. can be found at:
> http://people.apache.org/~pwendell/spark-1.1.0-rc1/
>
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1028/
>
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-1.1.0-rc1-docs/
>
> Please vote on releasing this package as Apache Spark 1.1.0!
>
> The vote is open until Sunday, August 31, at 17:00 UTC and passes if
> a majority of at least 3 +1 PMC votes are cast.
>
> [ ] +1 Release this package as Apache Spark 1.1.0
> [ ] -1 Do not release this package because ...
>
> To learn more about Apache Spark, please see
> http://spark.apache.org/
>
> == What justifies a -1 vote for this release? ==
> This vote is happening very late into the QA period compared with
> previous votes, so -1 votes should only occur for significant
> regressions from 1.0.2. Bugs already present in 1.0.X will not block
> this release.
>
> == What default changes should I be aware of? ==
> 1. The default value of "spark.io.compression.codec" is now "snappy"
> --> Old behavior can be restored by switching to "lzf"
>
> 2. PySpark now performs external spilling during aggregations.
> --> Old behavior can be restored by setting "spark.shuffle.spill" to
> "false".
>
> I'll send a bit more later today with feature information for the
> release. In the mean time I want to put this out there for
> consideration.
>
> - Patrick
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14113741#comment-14113741
 ] 

Mridul Muralidharan commented on SPARK-3277:


This looks like unrelated changes pushed to BlockObjectWriter as part of 
introduction of ShuffleWriteMetrics.
I had introducing checks and also documented that we must not infer size based 
on position of stream after flush - since close can write data to the streams 
(and one flush can result in more data getting generated which need not be 
flushed to streams).

Apparently this logic was modified subsequently causing this bug.
Solution would be to revert changes to update shuffleBytesWritten before close 
of stream.
It must be done after close and based on file.length

> LZ4 compression cause the the ExternalSort exception
> 
>
> Key: SPARK-3277
> URL: https://issues.apache.org/jira/browse/SPARK-3277
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.2
>Reporter: hzw
> Fix For: 1.1.0
>
>
> I tested the LZ4 compression,and it come up with such problem.(with wordcount)
> Also I tested the snappy and LZF,and they were OK.
> At last I set the  "spark.shuffle.spill" as false to avoid such exeception, 
> but once open this "switch", this error would come.
> Exeception Info as follow:
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:165)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:416)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3175) Branch-1.1 SBT build failed for Yarn-Alpha

2014-08-23 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14107923#comment-14107923
 ] 

Mridul Muralidharan commented on SPARK-3175:


Please add more information on why they need to be out of sync.
As of now, the only way to build for yarn-alpha is to manually update pom.xml

> Branch-1.1 SBT build failed for Yarn-Alpha
> --
>
> Key: SPARK-3175
> URL: https://issues.apache.org/jira/browse/SPARK-3175
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.1.1
>Reporter: Chester
>  Labels: build
> Fix For: 1.1.1
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When trying to build yarn-alpha on branch-1.1
> áš› |branch-1.1|$  sbt/sbt -Pyarn-alpha -Dhadoop.version=2.0.5-alpha projects
> [info] Loading project definition from /Users/chester/projects/spark/project
> org.apache.maven.model.building.ModelBuildingException: 1 problem was 
> encountered while building the effective model for 
> org.apache.spark:spark-yarn-alpha_2.10:1.1.0
> [FATAL] Non-resolvable parent POM: Could not find artifact 
> org.apache.spark:yarn-parent_2.10:pom:1.1.0 in central ( 
> http://repo.maven.apache.org/maven2) and 'parent.relativePath' points at 
> wrong local POM @ line 20, column 11



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: is Branch-1.1 SBT build broken for yarn-alpha ?

2014-08-21 Thread Mridul Muralidharan
Weird that Patrick did not face this while creating the RC.
Essentially the yarn alpha pom.xml has not been updated properly in
the 1.1 branch.

Just change version to '1.1.1-SNAPSHOT' for yarn/alpha/pom.xml (to
make it same as any other pom).


Regards,
Mridul


On Thu, Aug 21, 2014 at 5:09 AM, Chester Chen  wrote:
> I just updated today's build and tried branch-1.1 for both yarn and
> yarn-alpha.
>
> For yarn build, this command seem to work fine.
>
> sbt/sbt -Pyarn -Dhadoop.version=2.3.0-cdh5.0.1 projects
>
> for yarn-alpha
>
> sbt/sbt -Pyarn-alpha -Dhadoop.version=2.0.5-alpha projects
>
> I got the following
>
> Any ideas
>
>
> Chester
>
> áš› |branch-1.1|$  *sbt/sbt -Pyarn-alpha -Dhadoop.version=2.0.5-alpha
> projects*
>
> Using /Library/Java/JavaVirtualMachines/1.6.0_51-b11-457.jdk/Contents/Home
> as default JAVA_HOME.
>
> Note, this will be overridden by -java-home if it is set.
>
> [info] Loading project definition from
> /Users/chester/projects/spark/project/project
>
> [info] Loading project definition from
> /Users/chester/.sbt/0.13/staging/ec3aa8f39111944cc5f2/sbt-pom-reader/project
>
> [warn] Multiple resolvers having different access mechanism configured with
> same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate
> project resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
>
> [info] Loading project definition from /Users/chester/projects/spark/project
>
> org.apache.maven.model.building.ModelBuildingException: 1 problem was
> encountered while building the effective model for
> org.apache.spark:spark-yarn-alpha_2.10:1.1.0
>
> *[FATAL] Non-resolvable parent POM: Could not find artifact
> org.apache.spark:yarn-parent_2.10:pom:1.1.0 in central (
> http://repo.maven.apache.org/maven2 )
> and 'parent.relativePath' points at wrong local POM @ line 20, column 11*
>
>
>  at
> org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195)
>
> at
> org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)
>
> at
> org.apache.maven.model.building.DefaultModelBuilder.readParent(DefaultModelBuilder.java:664)
>
> at
> org.apache.maven.model.building.DefaultModelBuilder.build(DefaultModelBuilder.java:310)
>
> at
> org.apache.maven.model.building.DefaultModelBuilder.build(DefaultModelBuilder.java:232)
>
> at
> com.typesafe.sbt.pom.MvnPomResolver.loadEffectivePom(MavenPomResolver.scala:61)
>
> at com.typesafe.sbt.pom.package$.loadEffectivePom(package.scala:41)
>
> at
> com.typesafe.sbt.pom.MavenProjectHelper$.makeProjectTree(MavenProjectHelper.scala:128)
>
> at
> com.typesafe.sbt.pom.MavenProjectHelper$$anonfun$12.apply(MavenProjectHelper.scala:129)
>
> at
> com.typesafe.sbt.pom.MavenProjectHelper$$anonfun$12.apply(MavenProjectHelper.scala:129)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>
> at
> com.typesafe.sbt.pom.MavenProjectHelper$.makeProjectTree(MavenProjectHelper.scala:129)
>
> at
> com.typesafe.sbt.pom.MavenProjectHelper$$anonfun$12.apply(MavenProjectHelper.scala:129)
>
> at
> com.typesafe.sbt.pom.MavenProjectHelper$$anonfun$12.apply(MavenProjectHelper.scala:129)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>
> at
> com.typesafe.sbt.pom.MavenProjectHelper$.makeProjectTree(MavenProjectHelper.scala:129)
>
> at
> com.typesafe.sbt.pom.MavenProjectHelper$.makeReactorProject(MavenProjectHelper.scala:49)
>
> at com.typesafe.sbt.pom.PomBuild$class.projectDefinitions(PomBuild.scala:28)
>
> at SparkBuild$.projectDefinitions(SparkBuild.scala:165)
>
> at sbt.Load$.sbt$Load$$projectsFromBuild(Load.scala:458)
>
> at sbt.Load$$anonfun$24.apply(Load.scala:415)
>
> at sbt.Load$$anonfun$24.apply(Load.scala:415)
>
> at scala.collection.immutable.Stream.flatMap(Stream.scala:442)
>
> at sbt.Load$.loadUnit(Load.scala:415)
>
> at sbt.Load$$anonfun$15$$anonfun$apply$11.apply(Load.scala:256)
>
> at sbt.Load$$anonfun$15$$anonfun$apply$11.apply(Load.scala:256)
>
> at
> sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6.apply(BuildLoader

[jira] [Commented] (SPARK-3115) Improve task broadcast latency for small tasks

2014-08-19 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14102142#comment-14102142
 ] 

Mridul Muralidharan commented on SPARK-3115:


I had a tab open with pretty much exact same bug comments ready to be filed :-)

> Improve task broadcast latency for small tasks
> --
>
> Key: SPARK-3115
> URL: https://issues.apache.org/jira/browse/SPARK-3115
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Shivaram Venkataraman
>Assignee: Reynold Xin
>
> Broadcasting the task information helps reduce the amount of data transferred 
> for large tasks. However we've seen that this adds more latency for small 
> tasks. It'll be great to profile and fix this.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-08-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100043#comment-14100043
 ] 

Mridul Muralidharan commented on SPARK-3019:


Unfortunately, I never went into how MR does shuffle - though I was supposed to 
dig into this with Tom in Q1 - Q2 timeframe; so hopefully I am not way off base 
here !

bq. It's true that we can't start on a function which requires a full view of 
the coming in for a particular key, but we can start merging and combining.

In case of spark, unlike MR, we cannot start merging/combining until all blocks 
are fetched.
Well, technically we can - but we will end up repeating merge/combine multiple 
times for each new map output fetched, and it would be very suboptimal since we 
will be reading way more times from disk (hope I did not get this wrong /CC 
[~matei]).


bq. MapReduce makes this assessment. Each reducer has a pool of memory for 
fetching data into, and avoids fetching more data than can fit into this pool. 
I was under the impression that Spark does something similar.

In case of hash based shuffle, obviously this is not possible.
In case of sort based shuffle, I can see this being possible : but it is not 
supported (iirc /CC [~matei]).


> Pluggable block transfer (data plane communication) interface
> -
>
> Key: SPARK-3019
> URL: https://issues.apache.org/jira/browse/SPARK-3019
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
> 1.pdf
>
>
> The attached design doc proposes a standard interface for block transferring, 
> which will make future engineering of this functionality easier, allowing the 
> Spark community to provide alternative implementations.
> Block transferring is a critical function in Spark. All of the following 
> depend on it:
> * shuffle
> * torrent broadcast
> * block replication in BlockManager
> * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-08-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099910#comment-14099910
 ] 

Mridul Muralidharan commented on SPARK-3019:


Btw, can we do something about block replication when replication factor > 1 ?
Currently we silently loose replicas; and the block placement strategy for 
replication is fairly non existant.

> Pluggable block transfer (data plane communication) interface
> -
>
> Key: SPARK-3019
> URL: https://issues.apache.org/jira/browse/SPARK-3019
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
> 1.pdf
>
>
> The attached design doc proposes a standard interface for block transferring, 
> which will make future engineering of this functionality easier, allowing the 
> Spark community to provide alternative implementations.
> Block transferring is a critical function in Spark. All of the following 
> depend on it:
> * shuffle
> * torrent broadcast
> * block replication in BlockManager
> * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-08-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099909#comment-14099909
 ] 

Mridul Muralidharan commented on SPARK-3019:



I am yet to go through the proposal in detail so will defer comments on that 
for later; but to get some clarity on discussion around Sandy's point :

- Until we read from all mappers, shuffle cant actually start.
Even if a single mapper's output is small enough to fit into memory (which it 
need not); num_mappers * avg_size_of_map_output_per_reducer could be way larger 
than available memory by orders. (This is fairly common for us for example).
This was the reason we actually worked on 2G fix btw - individual blocks in a 
mapper and also the data per reducer for a mapper was larger than 2G :-)

- While reading data off network, we cannot make an assessment if the read data 
can fit into memory or not (since there are other parallel read requests 
pending for this and other cores in the same executor).
So spooling intermediate data to disk would become necessary at both mapper 
side (which it already does) and at reducer side (which we dont do currently - 
assume that a block can fit into reducer memory as part of doing a remote 
fetch). This becomes more relevant when we want to target bigger blocks of data 
and tackle skew in data (for shuffle)

> Pluggable block transfer (data plane communication) interface
> -
>
> Key: SPARK-3019
> URL: https://issues.apache.org/jira/browse/SPARK-3019
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
> 1.pdf
>
>
> The attached design doc proposes a standard interface for block transferring, 
> which will make future engineering of this functionality easier, allowing the 
> Spark community to provide alternative implementations.
> Block transferring is a critical function in Spark. All of the following 
> depend on it:
> * shuffle
> * torrent broadcast
> * block replication in BlockManager
> * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-08-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099115#comment-14099115
 ] 

Mridul Muralidharan commented on SPARK-2089:



For a general case, wont InputFormat's not have customizations to them for 
creation and/or initialization before they can be used to get splits ? (other 
than file names I mean).


> With YARN, preferredNodeLocalityData isn't honored 
> ---
>
> Key: SPARK-2089
> URL: https://issues.apache.org/jira/browse/SPARK-2089
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.0.0
>Reporter: Sandy Ryza
>Assignee: Sandy Ryza
>Priority: Critical
>
> When running in YARN cluster mode, apps can pass preferred locality data when 
> constructing a Spark context that will dictate where to request executor 
> containers.
> This is currently broken because of a race condition.  The Spark-YARN code 
> runs the user class and waits for it to start up a SparkContext.  During its 
> initialization, the SparkContext will create a YarnClusterScheduler, which 
> notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
> immediately fetches the preferredNodeLocationData from the SparkContext and 
> uses it to start requesting containers.
> But in the SparkContext constructor that takes the preferredNodeLocationData, 
> setting preferredNodeLocationData comes after the rest of the initialization, 
> so, if the Spark-YARN code comes around quickly enough after being notified, 
> the data that's fetched is the empty unset version.  The occurred during all 
> of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks

2014-08-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14099072#comment-14099072
 ] 

Mridul Muralidharan commented on SPARK-1476:


Based on discussions we had with others, apparently 1.1 was not a good vehicle 
for this proposal.
Further, since there was no interest in this jira/comments on the proposal, we 
put the effort on the backburner.

We plan to push atleast some of the bugs fixed as part of this effort - 
consolidated shuffle did get resolved in 1.1 and probably a few more might be 
contributed back in 1.2 time permitting (disk backed map output tracking for 
example looks like a good candidate).
But bulk of the change is pervasive and at times a bit invasive and at odds 
with some of the other changes (for example, zero-copy); shepherding it might 
be a bit time consuming for me given other deliverables.

If there is renewed interest in this to get it integrated into a spark release, 
I can try to push for it to be resurrected and submitted.

> 2GB limit in spark for blocks
> -
>
> Key: SPARK-1476
> URL: https://issues.apache.org/jira/browse/SPARK-1476
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
> Environment: all
>    Reporter: Mridul Muralidharan
>Assignee: Mridul Muralidharan
>Priority: Critical
> Attachments: 2g_fix_proposal.pdf
>
>
> The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
> the size of the block to 2GB.
> This has implication not just for managed blocks in use, but also for shuffle 
> blocks (memory mapped blocks are limited to 2gig, even though the api allows 
> for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
> This is a severe limitation for use of spark when used on non trivial 
> datasets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-08-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14095247#comment-14095247
 ] 

Mridul Muralidharan commented on SPARK-2089:


Since I am not maintaining the code anymore, I dont have strong preference 
either way.
I am not sure what the format means btw - I see multiple nodes and racks 
mentioned in the same group ...

In general though, I am not convinced it is a good direction to take.
1) It is a workaround for a design issue and has non trivial performance 
implications (serializing into this form to immediately deserialize it is 
expensive for large inputs : not to mention, it gets shipped to executors for 
no reason).
2) It locks us into a format which provides inadequate information - number of 
blocks per node, size per block, etc is lost (or maybe I just did not 
understand what the format is !).
3) We are currently investigating evolving in the opposite direction - add more 
information so that we can be more specific about where to allocate executors.
For example: I can see the fairly near term need to associate executors with 
accelerator cards (and break the OFF_HEAP -> tachyon implicit assumption).
A string representation makes it fragile to evolve.

As I mentioned before, the current yarn allocation model in spark is a very 
naive implementation - which I did not expect to survive this long : it was 
directly from our prototype.
We really should be modifying it to consider cost of data transfer and 
prioritize allocation that way (number of blocks on a node/rack, size of 
blocks, number of replicas available, etc).
For small datasets on small enough clusters this is not relevant but has 
implications as we grow along both axis.

> With YARN, preferredNodeLocalityData isn't honored 
> ---
>
> Key: SPARK-2089
> URL: https://issues.apache.org/jira/browse/SPARK-2089
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.0.0
>Reporter: Sandy Ryza
>Assignee: Sandy Ryza
>Priority: Critical
>
> When running in YARN cluster mode, apps can pass preferred locality data when 
> constructing a Spark context that will dictate where to request executor 
> containers.
> This is currently broken because of a race condition.  The Spark-YARN code 
> runs the user class and waits for it to start up a SparkContext.  During its 
> initialization, the SparkContext will create a YarnClusterScheduler, which 
> notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
> immediately fetches the preferredNodeLocationData from the SparkContext and 
> uses it to start requesting containers.
> But in the SparkContext constructor that takes the preferredNodeLocationData, 
> setting preferredNodeLocationData comes after the rest of the initialization, 
> so, if the Spark-YARN code comes around quickly enough after being notified, 
> the data that's fetched is the empty unset version.  The occurred during all 
> of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14092807#comment-14092807
 ] 

Mridul Muralidharan commented on SPARK-2962:


On further investigation :

a) The primary issue is a combination of SPARK-2089 and current schedule 
behavior for pendingTasksWithNoPrefs.
SPARK-2089 leads to very bad allocation of nodes - particularly has an impact 
on bigger clusters.
It leads to a lot of block having no data or rack local executors - causing 
them to end up in pendingTasksWithNoPrefs.

While loading data off dfs, when an executor is being scheduled, even though 
there might be rack local schedules available for it (or, on waiting a while, 
data local too - see (b) below), because of current scheduler behavior, tasks 
from pendingTasksWithNoPrefs get scheduled : causing a large number of ANY 
tasks to be scheduled at the very onset.

The combination of these, with lack of marginal alleviation via (b) is what 
caused the performance impact.

b) spark.scheduler.minRegisteredExecutorsRatio was not yet been used in the 
workload - so that might alleviate some of the non deterministic waiting and 
ensuring adequate executors are allocated ! Thanks [~lirui]



> Suboptimal scheduling in spark
> --
>
> Key: SPARK-2962
> URL: https://issues.apache.org/jira/browse/SPARK-2962
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: All
>Reporter: Mridul Muralidharan
>
> In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
> are always scheduled with PROCESS_LOCAL
> pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
> locations - but which could come in 'later' : particularly relevant when 
> spark app is just coming up and containers are still being added.
> This causes a large number of non node local tasks to be scheduled incurring 
> significant network transfers in the cluster when running with non trivial 
> datasets.
> The comment "// Look for no-pref tasks after rack-local tasks since they can 
> run anywhere." is misleading in the method code : locality levels start from 
> process_local down to any, and so no prefs get scheduled much before rack.
> Also note that, currentLocalityIndex is reset to the taskLocality returned by 
> this method - so returning PROCESS_LOCAL as the level will trigger wait times 
> again. (Was relevant before recent change to scheduler, and might be again 
> based on resolution of this issue).
> Found as part of writing test for SPARK-2931
>  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14092427#comment-14092427
 ] 

Mridul Muralidharan edited comment on SPARK-2962 at 8/11/14 4:35 AM:
-

To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-2089 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]




was (Author: mridulm80):
To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-208 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]



> Suboptimal scheduling in spark
> --
>
> Key: SPARK-2962
> URL: https://issues.apache.org/jira/browse/SPARK-2962
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: All
>Reporter: Mridul Muralidharan
>
> In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
> are always scheduled with PROCESS_LOCAL
> pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
> locations - but which could come in 'later' : particularly relevant when 
> spark app is just coming up and containers are still being added.
> This causes a large number of non node local tasks to be scheduled incurring 
> significant network transfers in the cluster when running with non trivial 
> datasets.
> The comment "// Look for no-pref tasks after rack-local tasks since they can 
> run anywhere." is misleading in the method code : locality levels start from 
> process_local down to any, and so no prefs get scheduled much before rack.
> Also note that, currentLocalityIndex is reset to the taskLocality returned by 
> this method - so returning PROCESS_LOCAL as the level will trigger wait times 
> again. (Was relevant before recent change to scheduler, and might be again 
> based on resolution of this issue).
> Found as part of writing test for SPARK-2931
>  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14092431#comment-14092431
 ] 

Mridul Muralidharan commented on SPARK-2962:


Note, I dont think this is a regression in 1.1, and probably existed much 
earlier too.
Other issues are making us notice this (like SPARK-2089) - we moved to 1.1 from 
0.9 recently.

> Suboptimal scheduling in spark
> --
>
> Key: SPARK-2962
> URL: https://issues.apache.org/jira/browse/SPARK-2962
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: All
>Reporter: Mridul Muralidharan
>
> In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
> are always scheduled with PROCESS_LOCAL
> pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
> locations - but which could come in 'later' : particularly relevant when 
> spark app is just coming up and containers are still being added.
> This causes a large number of non node local tasks to be scheduled incurring 
> significant network transfers in the cluster when running with non trivial 
> datasets.
> The comment "// Look for no-pref tasks after rack-local tasks since they can 
> run anywhere." is misleading in the method code : locality levels start from 
> process_local down to any, and so no prefs get scheduled much before rack.
> Also note that, currentLocalityIndex is reset to the taskLocality returned by 
> this method - so returning PROCESS_LOCAL as the level will trigger wait times 
> again. (Was relevant before recent change to scheduler, and might be again 
> based on resolution of this issue).
> Found as part of writing test for SPARK-2931
>  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14092430#comment-14092430
 ] 

Mridul Muralidharan commented on SPARK-2962:


Hi [~matei],

  I am referencing the latest code (as of yday night).

pendingTasksWithNoPrefs currnetly contains both tasks which truely have no 
preference, and tasks which have preference which are unavailble - and the 
latter is what is triggering this, since that can change during the execution 
of the stage.
Hope I am not missing something ?

Thanks,
Mridul

> Suboptimal scheduling in spark
> --
>
> Key: SPARK-2962
> URL: https://issues.apache.org/jira/browse/SPARK-2962
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: All
>Reporter: Mridul Muralidharan
>
> In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
> are always scheduled with PROCESS_LOCAL
> pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
> locations - but which could come in 'later' : particularly relevant when 
> spark app is just coming up and containers are still being added.
> This causes a large number of non node local tasks to be scheduled incurring 
> significant network transfers in the cluster when running with non trivial 
> datasets.
> The comment "// Look for no-pref tasks after rack-local tasks since they can 
> run anywhere." is misleading in the method code : locality levels start from 
> process_local down to any, and so no prefs get scheduled much before rack.
> Also note that, currentLocalityIndex is reset to the taskLocality returned by 
> this method - so returning PROCESS_LOCAL as the level will trigger wait times 
> again. (Was relevant before recent change to scheduler, and might be again 
> based on resolution of this issue).
> Found as part of writing test for SPARK-2931
>  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14092427#comment-14092427
 ] 

Mridul Muralidharan commented on SPARK-2962:


To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-208 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]



> Suboptimal scheduling in spark
> --
>
> Key: SPARK-2962
> URL: https://issues.apache.org/jira/browse/SPARK-2962
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: All
>Reporter: Mridul Muralidharan
>
> In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
> are always scheduled with PROCESS_LOCAL
> pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
> locations - but which could come in 'later' : particularly relevant when 
> spark app is just coming up and containers are still being added.
> This causes a large number of non node local tasks to be scheduled incurring 
> significant network transfers in the cluster when running with non trivial 
> datasets.
> The comment "// Look for no-pref tasks after rack-local tasks since they can 
> run anywhere." is misleading in the method code : locality levels start from 
> process_local down to any, and so no prefs get scheduled much before rack.
> Also note that, currentLocalityIndex is reset to the taskLocality returned by 
> this method - so returning PROCESS_LOCAL as the level will trigger wait times 
> again. (Was relevant before recent change to scheduler, and might be again 
> based on resolution of this issue).
> Found as part of writing test for SPARK-2931
>  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2962:
--

 Summary: Suboptimal scheduling in spark
 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan



In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are 
always scheduled with PROCESS_LOCAL

pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
locations - but which could come in 'later' : particularly relevant when spark 
app is just coming up and containers are still being added.

This causes a large number of non node local tasks to be scheduled incurring 
significant network transfers in the cluster when running with non trivial 
datasets.

The comment "// Look for no-pref tasks after rack-local tasks since they can 
run anywhere." is misleading in the method code : locality levels start from 
process_local down to any, and so no prefs get scheduled much before rack.


Also note that, currentLocalityIndex is reset to the taskLocality returned by 
this method - so returning PROCESS_LOCAL as the level will trigger wait times 
again. (Was relevant before recent change to scheduler, and might be again 
based on resolution of this issue).


Found as part of writing test for SPARK-2931
 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14091881#comment-14091881
 ] 

Mridul Muralidharan commented on SPARK-2931:


[~joshrosen] [~kayousterhout] Added a patch which deterministically showcases 
the bug - should be easy to fix it now I hope :-)

> getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
> ---
>
> Key: SPARK-2931
> URL: https://issues.apache.org/jira/browse/SPARK-2931
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
> benchmark
>Reporter: Josh Rosen
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: scala-sort-by-key.err, test.patch
>
>
> When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
> I get the following errors (one per task):
> {code}
> 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
> 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
> bytes)
> 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
> executor: 
> Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
>  with ID 0
> 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
> java.lang.ArrayIndexOutOfBoundsException: 1
>   at 
> org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
>   at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
>   at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
>   at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> This causes the job to hang.
> I can deterministically reproduce this by re-running the test, either in 
> isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-2931:
---

Attachment: test.patch

A patch to showcase the exception

> getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
> ---
>
> Key: SPARK-2931
> URL: https://issues.apache.org/jira/browse/SPARK-2931
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
> benchmark
>Reporter: Josh Rosen
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: scala-sort-by-key.err, test.patch
>
>
> When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
> I get the following errors (one per task):
> {code}
> 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
> 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
> bytes)
> 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
> executor: 
> Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
>  with ID 0
> 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
> java.lang.ArrayIndexOutOfBoundsException: 1
>   at 
> org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
>   at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
>   at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
>   at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> This causes the job to hang.
> I can deterministically reproduce this by re-running the test, either in 
> isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14091849#comment-14091849
 ] 

Mridul Muralidharan commented on SPARK-2931:


Checking more, it might have been for an internal review : not sure, cant find 
an external reference.
So the issue is as [~kayousterhout] described, after locality levels are 
updated (as part of an executor loss, etc); the index is not updated 
accordingly (or reset).

The earlier code was assuming the population of the data structures wont change 
once created - which is no longer the case.
A testcase to simulate this should be possible - rough sketch :
a) Add two executors one PROCESS_LOCAL executor exec1 and one ANY executors 
exec2.
my locality levels should now contain all levels now due to exec1
b) schedule a task on the process_local executors, and wait sufficiently such 
that level is now at rack or any.
c) fail the process_local executor - iirc currently this will not cause 
recomputation of levels.
d) Add an ANY executor exec3 - this will trigger computeValidLocalityLevels in 
executorAdded (exec failure does not, we probably should add that).
e) Now, any invocation of getAllowedLocalityLevel will cause the exception Josh 
mentioned.


and ensure the corresponding cleanup is triggered; to cause an update


> getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
> ---
>
> Key: SPARK-2931
> URL: https://issues.apache.org/jira/browse/SPARK-2931
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
> benchmark
>Reporter: Josh Rosen
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: scala-sort-by-key.err
>
>
> When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
> I get the following errors (one per task):
> {code}
> 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
> 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
> bytes)
> 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
> executor: 
> Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
>  with ID 0
> 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
> java.lang.ArrayIndexOutOfBoundsException: 1
>   at 
> org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
>   at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
>   at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
>   at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at sc

Re: Unit tests in < 5 minutes

2014-08-09 Thread Mridul Muralidharan
Issue with supporting this imo is the fact that scala-test uses the
same vm for all the tests (surefire plugin supports fork, but
scala-test ignores it iirc).
So different tests would initialize different spark context, and can
potentially step on each others toes.

Regards,
Mridul


On Fri, Aug 8, 2014 at 9:31 PM, Nicholas Chammas
 wrote:
> Howdy,
>
> Do we think it's both feasible and worthwhile to invest in getting our unit
> tests to finish in under 5 minutes (or something similarly brief) when run
> by Jenkins?
>
> Unit tests currently seem to take anywhere from 30 min to 2 hours. As
> people add more tests, I imagine this time will only grow. I think it would
> be better for both contributors and reviewers if they didn't have to wait
> so long for test results; PR reviews would be shorter, if nothing else.
>
> I don't know how how this is normally done, but maybe it wouldn't be too
> much work to get a test cycle to feel lighter.
>
> Most unit tests are independent and can be run concurrently, right? Would
> it make sense to build a given patch on many servers at once and send
> disjoint sets of unit tests to each?
>
> I'd be interested in working on something like that if possible (and
> sensible).
>
> Nick

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14091746#comment-14091746
 ] 

Mridul Muralidharan commented on SPARK-2931:


[~kayousterhout] this is weird, I remember mentioned this exact same issue in 
some PR for 1.1 (trying to find which one, though not 1313 iirc); and I think 
it was supposed to have been addressed.
We had observed this issue of currentLocalityLevel running away when we had 
internally merged the pr.

Strange that it was not addressed, speaks volumes of me not following up on my 
reviews !

> getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
> ---
>
> Key: SPARK-2931
> URL: https://issues.apache.org/jira/browse/SPARK-2931
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
> Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
> benchmark
>Reporter: Josh Rosen
>Priority: Blocker
> Fix For: 1.1.0
>
>
> When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
> I get the following errors (one per task):
> {code}
> 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
> 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
> bytes)
> 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
> executor: 
> Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
>  with ID 0
> 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
> java.lang.ArrayIndexOutOfBoundsException: 1
>   at 
> org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
>   at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
>   at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
>   at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> This causes the job to hang.
> I can deterministically reproduce this by re-running the test, either in 
> isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp

2014-08-06 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14088018#comment-14088018
 ] 

Mridul Muralidharan edited comment on SPARK-2881 at 8/6/14 6:45 PM:


To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under "java.io.tmpdir" as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am +1 on marking this as a blocker for 
release


was (Author: mridulm80):
To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under "java.io.tmpdir" as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am marking this as a blocker for 
release

> Snappy is now default codec - could lead to conflicts since uses /tmp
> -
>
> Key: SPARK-2881
> URL: https://issues.apache.org/jira/browse/SPARK-2881
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Thomas Graves
>Priority: Blocker
>
> I was using spark master branch and I ran into an issue with Snappy since its 
> now the default codec for shuffle. 
> The issue was that someone else had run with snappy and it created 
> /tmp/snappy-*.so but it had restrictive permissions so I was not able to use 
> it or remove it.   This caused my spark job to not start.  
> I was running in yarn client mode at the time.  Yarn cluster mode shouldn't 
> have this issue since we change the java.io.tmpdir. 
> I assume this would also affect standalone mode.
> I'm not sure if this is a true blocker but wanted to file it as one at first 
> and let us decide.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp

2014-08-06 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14088018#comment-14088018
 ] 

Mridul Muralidharan commented on SPARK-2881:


To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under "java.io.tmpdir" as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am marking this as a blocker for 
release

> Snappy is now default codec - could lead to conflicts since uses /tmp
> -
>
> Key: SPARK-2881
> URL: https://issues.apache.org/jira/browse/SPARK-2881
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Thomas Graves
>Priority: Blocker
>
> I was using spark master branch and I ran into an issue with Snappy since its 
> now the default codec for shuffle. 
> The issue was that someone else had run with snappy and it created 
> /tmp/snappy-*.so but it had restrictive permissions so I was not able to use 
> it or remove it.   This caused my spark job to not start.  
> I was running in yarn client mode at the time.  Yarn cluster mode shouldn't 
> have this issue since we change the java.io.tmpdir. 
> I assume this would also affect standalone mode.
> I'm not sure if this is a true blocker but wanted to file it as one at first 
> and let us decide.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: -1s on pull requests?

2014-08-05 Thread Mridul Muralidharan
Just came across this mail, thanks for initiating this discussion Kay.
To add; another issue which recurs is very rapid commit's: before most
contributors have had a chance to even look at the changes proposed.
There is not much prior discussion on the jira or pr, and the time
between submitting the PR and committing it is < 12 hours.

Particularly relevant when contributors are not on US timezones and/or
colocated; I have raised this a few times before when the commit had
other side effects not considered.
On flip side we have PR's which have been languishing for weeks with
little or no activity from committers side - making the contribution
stale; so too long a delay is also definitely not the direction to
take either !



Regards,
Mridul



On Tue, Jul 22, 2014 at 2:14 AM, Kay Ousterhout  wrote:
> Hi all,
>
> As the number of committers / contributors on Spark has increased, there
> are cases where pull requests get merged before all the review comments
> have been addressed. This happens say when one committer points out a
> problem with the pull request, and another committer doesn't see the
> earlier comment and merges the PR before the comment has been addressed.
>  This is especially tricky for pull requests with a large number of
> comments, because it can be difficult to notice early comments describing
> blocking issues.
>
> This also happens when something accidentally gets merged after the tests
> have started but before tests have passed.
>
> Do folks have ideas on how we can handle this issue? Are there other
> projects that have good ways of handling this? It looks like for Hadoop,
> people can -1 / +1 on the JIRA.
>
> -Kay

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[jira] [Commented] (SPARK-2685) Update ExternalAppendOnlyMap to avoid buffer.remove()

2014-07-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074186#comment-14074186
 ] 

Mridul Muralidharan commented on SPARK-2685:


We moved to using java.util.LinkedList for this

> Update ExternalAppendOnlyMap to avoid buffer.remove()
> -
>
> Key: SPARK-2685
> URL: https://issues.apache.org/jira/browse/SPARK-2685
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Matei Zaharia
>
> This shifts the whole right side of the array back, which can be expensive. 
> It would be better to just swap the last element into the position we want to 
> remove at, then decrease the size of the array.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2532) Fix issues with consolidated shuffle

2014-07-16 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2532:
--

 Summary: Fix issues with consolidated shuffle
 Key: SPARK-2532
 URL: https://issues.apache.org/jira/browse/SPARK-2532
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
Priority: Critical
 Fix For: 1.1.0



Will file PR with changes as soon as merge is done (earlier merge became 
outdated in 2 weeks unfortunately :) ).

Consolidated shuffle is broken in multiple ways in spark :

a) Task failure(s) can cause the state to become inconsistent.

b) Multiple revert's or combination of close/revert/close can cause the state 
to be inconsistent.
(As part of exception/error handling).

c) Some of the api in block writer causes implementation issues - for example: 
a revert is always followed by close : but the implemention tries to keep them 
separate, resulting in surface for errors.

d) Fetching data from consolidated shuffle files can go badly wrong if the file 
is being actively written to : it computes length by subtracting next offset 
from current offset (or length if this is last offset)- the latter fails when 
fetch is happening in parallel to write.
Note, this happens even if there are no task failures of any kind !
This usually results in stream corruption or decompression errors.




--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14061094#comment-14061094
 ] 

Mridul Muralidharan commented on SPARK-2468:



Ah, small files - those are indeed a problem.

Btw, we do dispose off map'ed blocks as soon as it is done; so we dont need to 
wait for gc to free them. Also note that the files are closed as soon as opened 
and mmap'ed - so they do not count towards open file count/ulimit.

Agree on 1, 3 and 4 - some of these apply to sendfile too btw : so not 
avoidable; but it is the best we have right now.
Since we use mmap'ed buffers and rarely transfer the same file again, the 
performance jump might not be the order(s) of magnitude other projects claim - 
but then even 10% (or whatever) improvement in our case would be substantial !

> zero-copy shuffle network communication
> ---
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: better compression codecs for shuffle blocks?

2014-07-14 Thread Mridul Muralidharan
We tried with lower block size for lzf, but it barfed all over the place.
Snappy was the way to go for our jobs.


Regards,
Mridul


On Mon, Jul 14, 2014 at 12:31 PM, Reynold Xin  wrote:
> Hi Spark devs,
>
> I was looking into the memory usage of shuffle and one annoying thing is
> the default compression codec (LZF) is that the implementation we use
> allocates buffers pretty generously. I did a simple experiment and found
> that creating 1000 LZFOutputStream allocated 198976424 bytes (~190MB). If
> we have a shuffle task that uses 10k reducers and 32 threads running
> currently, the memory used by the lzf stream alone would be ~ 60GB.
>
> In comparison, Snappy only allocates ~ 65MB for every
> 1k SnappyOutputStream. However, Snappy's compression is slightly lower than
> LZF's. In my experience, it leads to 10 - 20% increase in size. Compression
> ratio does matter here because we are sending data across the network.
>
> In future releases we will likely change the shuffle implementation to open
> less streams. Until that happens, I'm looking for compression codec
> implementations that are fast, allocate small buffers, and have decent
> compression ratio.
>
> Does anybody on this list have any suggestions? If not, I will submit a
> patch for 1.1 that replaces LZF with Snappy for the default compression
> codec to lower memory usage.
>
>
> allocation data here: https://gist.github.com/rxin/ad7217ea60e3fb36c567


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14060545#comment-14060545
 ] 

Mridul Muralidharan commented on SPARK-2468:


Writing mmap'ed buffers are pretty efficient btw - the second fallback in 
transferTo implementation iirc.

> zero-copy shuffle network communication
> ---
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14060543#comment-14060543
 ] 

Mridul Muralidharan commented on SPARK-2468:


We map the file content and directly write that to the socket (except when the 
size is below 8k or so iirc) - are you sure we are copying to user space and 
back ?

> zero-copy shuffle network communication
> ---
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2398) Trouble running Spark 1.0 on Yarn

2014-07-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14060113#comment-14060113
 ] 

Mridul Muralidharan commented on SPARK-2398:



As discussed in the PR, I am attempting to list the various factors which 
contribute to overhead.
Note, this is not exhaustive (yet) - please add more to this JIRA - so that 
when we are reasonably sure, we can model the expected overhead based on these 
factors.

These factors are typically off-heap - since anything within heap is budgetted 
for by Xmx - and enforced by VM : and so should ideally (not practically 
always, see gc overheads) not exceed the Xmx value

1) 256 KB per socket accepted via ConnectionManager for inter-worker comm 
(setReceiveBufferSize)
Typically, there will be (numExecutor - 1) number of sockets open.

2) 128 KB per socket for writing output to dfs. For reads, this does not seem 
to be configured - and should be 8k per socket iirc.
Typically 1 per executor at a given point in time ?

3) 256k for each akka socket for send/receive buffer.
One per worker ? (to talk to master) - so 512kb ? Any other use of akka ?

4) If I am not wrong, netty might allocate multiple "spark.akka.frameSize" 
sized direct buffer. There might be a few of these allocated and pooled/reused.
I did not go in detail into netty code though. If someone else with more 
knowhow can clarify, that would be great !
Default size of 10mb for spark.akka.frameSize

5) The default size of the assembled spark jar is about 12x mb (and changing) - 
though not all classes get loaded, the overhead would be some function of this.
The actual footprint would be higher than the on-disk size.
IIRC this is outside of the heap - [~sowen], any comments on this ? I have not 
looked into these in like 10 years now !

6) Per thread (Xss) overhead of 1mb (for 64bit vm).
Last I recall, we have about 220 odd threads - not sure if this was at the 
master or on the workers.
Ofcourse, this is dependent on the various threadpools we use (io, computation, 
etc), akka and netty config, etc.

7) Disk read overhead.
Thanks for [~pwendell]'s fix, atleast for small files, the overhead is not too 
high - since we do not mmap files but directly read them.
But for anything larger than 8kb (default), we use memory mapped buffers.
The actual overhead depends on the number of files opened for read via 
DiskStore - and the entire file contents get mmap'ed into virt mem.
Note that there is some non-virt-mem overhead also at native level for these 
buffers.

The actual number of files opened should be carefully tracked to understand the 
effect of this on spark overhead : since this aspect is changing a lot off late.
Impact is on shuffle,  disk persisted rdd, among others.
The actual value would be application dependent (how large the data is !)


8) The overhead introduced by VM not being able to reclaim memory completely 
(the cost of moving data vs amount of space reclaimed).
Ideally, this should be low - but would be dependent on the heap space, 
collector used, among other things.
I am not very knowledgable of the recent advances in gc collectors, so I 
hesitate to put a number to this.



I am sure this is not an exhaustive list, please do add to this.
In our case specifically, and [~tgraves] could add more, the number of 
containers can be high (300+ is easily possible), memory per container is 
modest (8gig usually).
To add details of observed overhead patterns (from the PR discussion) - 
a) I have had inhouse GBDT impl run without customizing overhead (so default of 
384 mb) with 12gb container and 22 nodes on reasonably large dataset.
b) I have had to customize overhead to 1.7gb for collaborative filtering with 
8gb container and 300 nodes (on a fairly large dataset).
c) I have had to minimally customize overhead to do inhouse QR factorization of 
a 50k x 50k distributed dense matrix on 45 nodes at 12 gb each (this was 
incorrectly specified in the PR discussion).

> Trouble running Spark 1.0 on Yarn 
> --
>
> Key: SPARK-2398
> URL: https://issues.apache.org/jira/browse/SPARK-2398
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Nishkam Ravi
>
> Trouble running workloads in Spark-on-YARN cluster mode for Spark 1.0. 
> For example: SparkPageRank when run in standalone mode goes through without 
> any errors (tested for up to 30GB input dataset on a 6-node cluster).  Also 
> runs fine for a 1GB dataset in yarn cluster mode. Starts to choke (in yarn 
> cluster mode) as the input data size is increased. Confirmed for 16GB input 
> dataset.
> The same workload runs fine with Spark 0.9 in both standalone and yarn 
> cluster mode (for up to 30 GB input d

Re: [GitHub] spark pull request: Modify default YARN memory_overhead-- from an ...

2014-07-13 Thread Mridul Muralidharan
You are lucky :-) for some of our jobs, in a 8gb container, overhead is
1.8gb !
On 13-Jul-2014 2:40 pm, "nishkamravi2"  wrote:

> Github user nishkamravi2 commented on the pull request:
>
> https://github.com/apache/spark/pull/1391#issuecomment-48835560
>
> Sean, the memory_overhead is fairly substantial. More than 2GB for a
> 30GB executor. Less than 400MB for a 2GB executor.
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
> with INFRA.
> ---
>


Re: CPU/Disk/network performance instrumentation

2014-07-09 Thread Mridul Muralidharan
+1 on advanced mode !

Regards.
Mridul

On Thu, Jul 10, 2014 at 12:55 AM, Reynold Xin  wrote:
> Maybe it's time to create an advanced mode in the ui.
>
>
> On Wed, Jul 9, 2014 at 12:23 PM, Kay Ousterhout 
> wrote:
>
>> Hi all,
>>
>> I've been doing a bunch of performance measurement of Spark and, as part of
>> doing this, added metrics that record the average CPU utilization, disk
>> throughput and utilization for each block device, and network throughput
>> while each task is running.  These metrics are collected by reading the
>> /proc filesystem so work only on Linux.  I'm happy to submit a pull request
>> with the appropriate changes but first wanted to see if sufficiently many
>> people think this would be useful.  I know the metrics reported by Spark
>> (and in the UI) are already overwhelming to some folks so don't want to add
>> more instrumentation if it's not widely useful.
>>
>> These metrics are slightly more difficult to interpret for Spark than
>> similar metrics reported by Hadoop because, with Spark, multiple tasks run
>> in the same JVM and therefore as part of the same process.  This means
>> that, for example, the CPU utilization metrics reflect the CPU use across
>> all tasks in the JVM, rather than only the CPU time used by the particular
>> task.  This is a pro and a con -- it makes it harder to determine why
>> utilization is high (it may be from a different task) but it also makes the
>> metrics useful for diagnosing straggler problems.  Just wanted to clarify
>> this before asking folks to weigh in on whether the added metrics would be
>> useful.
>>
>> -Kay
>>
>> (if you're curious, the instrumentation code is on a very messy branch
>> here:
>>
>> https://github.com/kayousterhout/spark-1/tree/proc_logging_perf_minimal_temp/core/src/main/scala/org/apache/spark/performance_logging
>> )
>>


Unresponsive to PR/jira changes

2014-07-09 Thread Mridul Muralidharan
Hi,


  I noticed today that gmail has been marking most of the mails from
spark github/jira I was receiving to spam folder; and I was assuming
it was lull in activity due to spark summit for past few weeks !

In case I have commented on specific PR/JIRA issues and not followed
up, apologies for the same - please do reach out in case it is still
pending something from my end.



Regards,
Mridul


Re: on shark, is tachyon less efficient than memory_only cache strategy ?

2014-07-08 Thread Mridul Muralidharan
You are ignoring serde costs :-)

- Mridul

On Tue, Jul 8, 2014 at 8:48 PM, Aaron Davidson  wrote:
> Tachyon should only be marginally less performant than memory_only, because
> we mmap the data from Tachyon's ramdisk. We do not have to, say, transfer
> the data over a pipe from Tachyon; we can directly read from the buffers in
> the same way that Shark reads from its in-memory columnar format.
>
>
>
> On Tue, Jul 8, 2014 at 1:18 AM, qingyang li 
> wrote:
>
>> hi, when i create a table, i can point the cache strategy using
>> shark.cache,
>> i think "shark.cache=memory_only"  means data are managed by spark, and
>> data are in the same jvm with excutor;   while  "shark.cache=tachyon"
>>  means  data are managed by tachyon which is off heap, and data are not in
>> the same jvm with excutor,  so spark will load data from tachyon for each
>> query sql , so,  is  tachyon less efficient than memory_only cache strategy
>>  ?
>> if yes, can we let spark load all data once from tachyon  for all sql query
>>  if i want to use tachyon cache strategy since tachyon is more HA than
>> memory_only ?
>>


[jira] [Commented] (SPARK-2390) Files in staging directory cannot be deleted and wastes the space of HDFS

2014-07-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14054162#comment-14054162
 ] 

Mridul Muralidharan commented on SPARK-2390:


Here, and a bunch of other places, spark currently closes the Filesystem 
instance : this is incorrect, and should not be done.
The fix would be to remove the fs.close; not force creation of new instances.

> Files in staging directory cannot be deleted and wastes the space of HDFS
> -
>
> Key: SPARK-2390
> URL: https://issues.apache.org/jira/browse/SPARK-2390
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.0, 1.0.1
>Reporter: Kousuke Saruta
>
> When running jobs with YARN Cluster mode and using HistoryServer, the files 
> in the Staging Directory cannot be deleted.
> HistoryServer uses directory where event log is written, and the directory is 
> represented as a instance of o.a.h.f.FileSystem created by using 
> FileSystem.get.
> {code:title=FileLogger.scala}
> private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
> {code}
> {code:title=utils.getHadoopFileSystem}
> def getHadoopFileSystem(path: URI): FileSystem = {
>   FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
> }
> {code}
> On the other hand, ApplicationMaster has a instance named fs, which also 
> created by using FileSystem.get.
> {code:title=ApplicationMaster}
> private val fs = FileSystem.get(yarnConf)
> {code}
> FileSystem.get returns cached same instance when URI passed to the method 
> represents same file system and the method is called by same user.
> Because of the behavior, when the directory for event log is on HDFS, fs of 
> ApplicationMaster and fileSystem of FileLogger is same instance.
> When shutting down ApplicationMaster, fileSystem.close is called in 
> FileLogger#stop, which is invoked by SparkContext#stop indirectly.
> {code:title=FileLogger.stop}
> def stop() {
>   hadoopDataStream.foreach(_.close())
>   writer.foreach(_.close())
>   fileSystem.close()
> }
> {code}
> And  ApplicationMaster#cleanupStagingDir also called by JVM shutdown hook. In 
> this method, fs.delete(stagingDirPath) is invoked. 
> Because fs.delete in ApplicationMaster is called after fileSystem.close in 
> FileLogger, fs.delete fails and results not deleting files in the staging 
> directory.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14052679#comment-14052679
 ] 

Mridul Muralidharan commented on SPARK-2017:


Sounds great, ability to get to currently running tasks (to check current 
state), ability to get to task failures (to debug usually), some aggregate 
stats (gc, stats per executor, etc) and having some way to get to the full 
details (which is what is seen currently) in an "advanced" or "full" view.
Anything else would be a bonus :-)

> web ui stage page becomes unresponsive when the number of tasks is large
> 
>
> Key: SPARK-2017
> URL: https://issues.apache.org/jira/browse/SPARK-2017
> Project: Spark
>  Issue Type: Sub-task
>Reporter: Reynold Xin
>  Labels: starter
>
> {code}
> sc.parallelize(1 to 100, 100).count()
> {code}
> The above code creates one million tasks to be executed. The stage detail web 
> ui page takes forever to load (if it ever completes).
> There are again a few different alternatives:
> 0. Limit the number of tasks we show.
> 1. Pagination
> 2. By default only show the aggregate metrics and failed tasks, and hide the 
> successful ones.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-04 Thread Mridul Muralidharan
In our clusters, number of containers we can get is high but memory
per container is low : which is why avg_nodes_not_hosting data is
rarely zero for ML tasks :-)

To update - to unblock our current implementation efforts, we went
with broadcast - since it is intutively easier and minimal change; and
compress the array as bytes in TaskResult.
This is then stored in disk backed maps - to remove memory pressure on
master and workers (else MapOutputTracker becomes a memory hog).

But I agree, compressed bitmap to represent 'large' blocks (anything
larger that maxBytesInFlight actually) and probably existing to track
non zero should be fine (we should not really track zero output for
reducer - just waste of space).


Regards,
Mridul

On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin  wrote:
> Note that in my original proposal, I was suggesting we could track whether
> block size = 0 using a compressed bitmap. That way we can still avoid
> requests for zero-sized blocks.
>
>
>
> On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin  wrote:
>
>> Yes, that number is likely == 0 in any real workload ...
>>
>>
>> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan 
>> wrote:
>>
>>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin  wrote:
>>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan 
>>> > wrote:
>>> >
>>> >>
>>> >> >
>>> >> > The other thing we do need is the location of blocks. This is
>>> actually
>>> >> just
>>> >> > O(n) because we just need to know where the map was run.
>>> >>
>>> >> For well partitioned data, wont this not involve a lot of unwanted
>>> >> requests to nodes which are not hosting data for a reducer (and lack
>>> >> of ability to throttle).
>>> >>
>>> >
>>> > Was that a question? (I'm guessing it is). What do you mean exactly?
>>>
>>>
>>> I was not sure if I understood the proposal correctly - hence the
>>> query : if I understood it right - the number of wasted requests goes
>>> up by num_reducers * avg_nodes_not_hosting data.
>>>
>>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !
>>>
>>> Regards,
>>> Mridul
>>>
>>
>>


[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14052289#comment-14052289
 ] 

Mridul Muralidharan commented on SPARK-2017:


With aggregated metrics, we loose the ability to check for gc time (which is 
actually what I use that UI for, other than to dig up exceptions on failed 
tasks).

> web ui stage page becomes unresponsive when the number of tasks is large
> 
>
> Key: SPARK-2017
> URL: https://issues.apache.org/jira/browse/SPARK-2017
> Project: Spark
>  Issue Type: Sub-task
>Reporter: Reynold Xin
>  Labels: starter
>
> {code}
> sc.parallelize(1 to 100, 100).count()
> {code}
> The above code creates one million tasks to be executed. The stage detail web 
> ui page takes forever to load (if it ever completes).
> There are again a few different alternatives:
> 0. Limit the number of tasks we show.
> 1. Pagination
> 2. By default only show the aggregate metrics and failed tasks, and hide the 
> successful ones.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14052275#comment-14052275
 ] 

Mridul Muralidharan commented on SPARK-2277:


Hmm, good point - that PR does change the scheduler expectations in a lot of 
ways which were not all anticipated.
Let me go through the current PR; thanks for the bug !

> Make TaskScheduler track whether there's host on a rack
> ---
>
> Key: SPARK-2277
> URL: https://issues.apache.org/jira/browse/SPARK-2277
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Rui Li
>
> When TaskSetManager adds a pending task, it checks whether the tasks's 
> preferred location is available. Regarding RACK_LOCAL task, we consider the 
> preferred rack available if such a rack is defined for the preferred host. 
> This is incorrect as there may be no alive hosts on that rack at all. 
> Therefore, TaskScheduler should track the hosts on each rack, and provides an 
> API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler

2014-07-03 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-2353:
---

Description: 
I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Unfortunately, we do not have the bandwidth to tackle this issue - would be 
great if someone could take a look at it ! Thanks.

  was:

I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is highly suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241)
at

[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-03 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14051575#comment-14051575
 ] 

Mridul Muralidharan commented on SPARK-2277:


I have not rechecked that the code, but the way it was originally written by me 
was :

a) Task preference is decoupled from availability of the node.
For example, we need not have an executor on a host for which a block has host 
preference (example dfs blocks on a shared cluster)
Also note that a block might have one or more preferred location.

b) We lookup the rack for the preferred location to get preferred rack.
As with (a), there need not be an executor on that rack. This is just the rack 
preference.


c) At schedule time, for an executor, we lookup the host/rack of the executors 
location - and decide appropriately based on that.



In this context, I think your requirement is already handled.
Even if we dont have any hosts alive on a rack, those tasks would still be 
mentioned with rack local preference in task set manager.
When an executor comes in (existing or new), we check that executors rack with 
task preference - and it would now be marked rack local.

> Make TaskScheduler track whether there's host on a rack
> ---
>
> Key: SPARK-2277
> URL: https://issues.apache.org/jira/browse/SPARK-2277
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Rui Li
>
> When TaskSetManager adds a pending task, it checks whether the tasks's 
> preferred location is available. Regarding RACK_LOCAL task, we consider the 
> preferred rack available if such a rack is defined for the preferred host. 
> This is incorrect as there may be no alive hosts on that rack at all. 
> Therefore, TaskScheduler should track the hosts on each rack, and provides an 
> API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-03 Thread Mridul Muralidharan
On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin  wrote:
> On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan 
> wrote:
>
>>
>> >
>> > The other thing we do need is the location of blocks. This is actually
>> just
>> > O(n) because we just need to know where the map was run.
>>
>> For well partitioned data, wont this not involve a lot of unwanted
>> requests to nodes which are not hosting data for a reducer (and lack
>> of ability to throttle).
>>
>
> Was that a question? (I'm guessing it is). What do you mean exactly?


I was not sure if I understood the proposal correctly - hence the
query : if I understood it right - the number of wasted requests goes
up by num_reducers * avg_nodes_not_hosting data.

Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !

Regards,
Mridul


[jira] [Created] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler

2014-07-03 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2353:
--

 Summary: ArrayIndexOutOfBoundsException in scheduler
 Key: SPARK-2353
 URL: https://issues.apache.org/jira/browse/SPARK-2353
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Mridul Muralidharan
Priority: Blocker



I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is highly suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14050886#comment-14050886
 ] 

Mridul Muralidharan commented on SPARK-2277:


I am not sure I follow this requirement.
For preferred locations, we populate their corresponding racks (if available) 
as preferred rack.

For available executors hosts, we lookup the rack they belong to - and then see 
if that rack is preferred or not.

This, ofcourse, assumes a host is only on a single rack.


What exactly is the behavior you are expecting from scheduler ?

> Make TaskScheduler track whether there's host on a rack
> ---
>
> Key: SPARK-2277
> URL: https://issues.apache.org/jira/browse/SPARK-2277
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Rui Li
>
> When TaskSetManager adds a pending task, it checks whether the tasks's 
> preferred location is available. Regarding RACK_LOCAL task, we consider the 
> preferred rack available if such a rack is defined for the preferred host. 
> This is incorrect as there may be no alive hosts on that rack at all. 
> Therefore, TaskScheduler should track the hosts on each rack, and provides an 
> API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-02 Thread Mridul Muralidharan
Hi Reynold,

  Please see inline.

Regards,
Mridul

On Wed, Jul 2, 2014 at 10:57 AM, Reynold Xin  wrote:
> I was actually talking to tgraves today at the summit about this.
>
> Based on my understanding, the sizes we track and send (which is
> unfortunately O(M*R) regardless of how we change the implementation --
> whether we send via task or send via MapOutputTracker) is only used to
> compute maxBytesInFlight so we can throttle the fetching speed to not
> result in oom. Perhaps for very large shuffles, we don't need to send the
> bytes for each block, and we can send whether they are zero or not (which
> can be tracked via a compressed bitmap that can be tiny).

You are right, currently for large blocks, we just need to know where
the block exists.
I was not sure if there was any possible future extension on that -
for this reason, in order to preserve functionality, we moved to using
Short from Byte for MapOutputTracker.compressedSize (to ensure large
sizes can be represented with 0.7% error).

Within a MapStatus, we moved to holding compressed data to save on
space within master/workers (particularly for large number of
reducers).

If we do not anticipate any other reason for "size", we can move back
to using Byte instead of Short to compress size (which will reduce
required space by some factor less than 2) : since error in computed
size for blocks larger than maxBytesInFlight does not really matter :
we will split them into different FetchRequest's.


>
> The other thing we do need is the location of blocks. This is actually just
> O(n) because we just need to know where the map was run.

For well partitioned data, wont this not involve a lot of unwanted
requests to nodes which are not hosting data for a reducer (and lack
of ability to throttle).


Regards,
Mridul

>
>
> On Tue, Jul 1, 2014 at 2:51 AM, Mridul Muralidharan 
> wrote:
>
>> We had considered both approaches (if I understood the suggestions right) :
>> a) Pulling only map output states for tasks which run on the reducer
>> by modifying the Actor. (Probably along lines of what Aaron described
>> ?)
>> The performance implication of this was bad :
>> 1) We cant cache serialized result anymore, (caching it makes no sense
>> rather).
>> 2) The number requests to master will go from num_executors to
>> num_reducers - the latter can be orders of magnitude higher than
>> former.
>>
>> b) Instead of pulling this information, push it to executors as part
>> of task submission. (What Patrick mentioned ?)
>> (1) a.1 from above is still an issue for this.
>> (2) Serialized task size is also a concern : we have already seen
>> users hitting akka limits for task size - this will be an additional
>> vector which might exacerbate it.
>> Our jobs are not hitting this yet though !
>>
>> I was hoping there might be something in akka itself to alleviate this
>> - but if not, we can solve it within context of spark.
>>
>> Currently, we have worked around it by using broadcast variable when
>> serialized size is above some threshold - so that our immediate
>> concerns are unblocked :-)
>> But a better solution should be greatly welcomed !
>> Maybe we can unify it with large serialized task as well ...
>>
>>
>> Btw, I am not sure what the higher cost of BlockManager referred to is
>> Aaron - do you mean the cost of persisting the serialized map outputs
>> to disk ?
>>
>>
>>
>>
>> Regards,
>> Mridul
>>
>>
>> On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell 
>> wrote:
>> > Yeah I created a JIRA a while back to piggy-back the map status info
>> > on top of the task (I honestly think it will be a small change). There
>> > isn't a good reason to broadcast the entire array and it can be an
>> > issue during large shuffles.
>> >
>> > - Patrick
>> >
>> > On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson 
>> wrote:
>> >> I don't know of any way to avoid Akka doing a copy, but I would like to
>> >> mention that it's on the priority list to piggy-back only the map
>> statuses
>> >> relevant to a particular map task on the task itself, thus reducing the
>> >> total amount of data sent over the wire by a factor of N for N physical
>> >> machines in your cluster. Ideally we would also avoid Akka entirely when
>> >> sending the tasks, as these can get somewhat large and Akka doesn't work
>> >> well with large messages.
>> >>
>> >> Do note that your solution of using broadcast to send the map tasks is
>> very
>> >> similar to how the

Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-02 Thread Mridul Muralidharan
Hi Patrick,

  Please see inline.

Regards,
Mridul


On Wed, Jul 2, 2014 at 10:52 AM, Patrick Wendell  wrote:
>> b) Instead of pulling this information, push it to executors as part
>> of task submission. (What Patrick mentioned ?)
>> (1) a.1 from above is still an issue for this.
>
> I don't understand problem a.1 is. In this case, we don't need to do
> caching, right?


To rephrase in this context, attempting to cache wont help since it is
reducer specific and benefits are minimal (other than for reexecution
for failures and speculative tasks).


>
>> (2) Serialized task size is also a concern : we have already seen
>> users hitting akka limits for task size - this will be an additional
>> vector which might exacerbate it.
>
> This would add only a small, constant amount of data to the task. It's
> strictly better than before. Before if the map output status array was
> size M x R, we send a single akka message to every node of size M x
> R... this basically scales quadratically with the size of the RDD. The
> new approach is constant... it's much better. And the total amount of
> data send over the wire is likely much less.


It would be a function of the number of mappers - and an overhead for each task.


Regards,
Mridul

>
> - Patrick


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-01 Thread Mridul Muralidharan
We had considered both approaches (if I understood the suggestions right) :
a) Pulling only map output states for tasks which run on the reducer
by modifying the Actor. (Probably along lines of what Aaron described
?)
The performance implication of this was bad :
1) We cant cache serialized result anymore, (caching it makes no sense rather).
2) The number requests to master will go from num_executors to
num_reducers - the latter can be orders of magnitude higher than
former.

b) Instead of pulling this information, push it to executors as part
of task submission. (What Patrick mentioned ?)
(1) a.1 from above is still an issue for this.
(2) Serialized task size is also a concern : we have already seen
users hitting akka limits for task size - this will be an additional
vector which might exacerbate it.
Our jobs are not hitting this yet though !

I was hoping there might be something in akka itself to alleviate this
- but if not, we can solve it within context of spark.

Currently, we have worked around it by using broadcast variable when
serialized size is above some threshold - so that our immediate
concerns are unblocked :-)
But a better solution should be greatly welcomed !
Maybe we can unify it with large serialized task as well ...


Btw, I am not sure what the higher cost of BlockManager referred to is
Aaron - do you mean the cost of persisting the serialized map outputs
to disk ?




Regards,
Mridul


On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell  wrote:
> Yeah I created a JIRA a while back to piggy-back the map status info
> on top of the task (I honestly think it will be a small change). There
> isn't a good reason to broadcast the entire array and it can be an
> issue during large shuffles.
>
> - Patrick
>
> On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson  wrote:
>> I don't know of any way to avoid Akka doing a copy, but I would like to
>> mention that it's on the priority list to piggy-back only the map statuses
>> relevant to a particular map task on the task itself, thus reducing the
>> total amount of data sent over the wire by a factor of N for N physical
>> machines in your cluster. Ideally we would also avoid Akka entirely when
>> sending the tasks, as these can get somewhat large and Akka doesn't work
>> well with large messages.
>>
>> Do note that your solution of using broadcast to send the map tasks is very
>> similar to how the executor returns the result of a task when it's too big
>> for akka. We were thinking of refactoring this too, as using the block
>> manager has much higher latency than a direct TCP send.
>>
>>
>> On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan 
>> wrote:
>>
>>> Our current hack is to use Broadcast variables when serialized
>>> statuses are above some (configurable) size : and have the workers
>>> directly pull them from master.
>>> This is a workaround : so would be great if there was a
>>> better/principled solution.
>>>
>>> Please note that the responses are going to different workers
>>> requesting for the output statuses for shuffle (after map) - so not
>>> sure if back pressure buffers, etc would help.
>>>
>>>
>>> Regards,
>>> Mridul
>>>
>>>
>>> On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan 
>>> wrote:
>>> > Hi,
>>> >
>>> >   While sending map output tracker result, the same serialized byte
>>> > array is sent multiple times - but the akka implementation copies it
>>> > to a private byte array within ByteString for each send.
>>> > Caching a ByteString instead of Array[Byte] did not help, since akka
>>> > does not support special casing ByteString : serializes the
>>> > ByteString, and copies the result out to an array before creating
>>> > ByteString out of it (in Array[Byte] serializing is thankfully simply
>>> > returning same array - so one copy only).
>>> >
>>> >
>>> > Given the need to send immutable data large number of times, is there
>>> > any way to do it in akka without copying internally in akka ?
>>> >
>>> >
>>> > To see how expensive it is, for 200 nodes withi large number of
>>> > mappers and reducers, the status becomes something like 30 mb for us -
>>> > and pulling this about 200 to 300 times results in OOM due to the
>>> > large number of copies sent out.
>>> >
>>> >
>>> > Thanks,
>>> > Mridul
>>>


Re: Eliminate copy while sending data : any Akka experts here ?

2014-06-30 Thread Mridul Muralidharan
Our current hack is to use Broadcast variables when serialized
statuses are above some (configurable) size : and have the workers
directly pull them from master.
This is a workaround : so would be great if there was a
better/principled solution.

Please note that the responses are going to different workers
requesting for the output statuses for shuffle (after map) - so not
sure if back pressure buffers, etc would help.


Regards,
Mridul


On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan  wrote:
> Hi,
>
>   While sending map output tracker result, the same serialized byte
> array is sent multiple times - but the akka implementation copies it
> to a private byte array within ByteString for each send.
> Caching a ByteString instead of Array[Byte] did not help, since akka
> does not support special casing ByteString : serializes the
> ByteString, and copies the result out to an array before creating
> ByteString out of it (in Array[Byte] serializing is thankfully simply
> returning same array - so one copy only).
>
>
> Given the need to send immutable data large number of times, is there
> any way to do it in akka without copying internally in akka ?
>
>
> To see how expensive it is, for 200 nodes withi large number of
> mappers and reducers, the status becomes something like 30 mb for us -
> and pulling this about 200 to 300 times results in OOM due to the
> large number of copies sent out.
>
>
> Thanks,
> Mridul


Eliminate copy while sending data : any Akka experts here ?

2014-06-30 Thread Mridul Muralidharan
Hi,

  While sending map output tracker result, the same serialized byte
array is sent multiple times - but the akka implementation copies it
to a private byte array within ByteString for each send.
Caching a ByteString instead of Array[Byte] did not help, since akka
does not support special casing ByteString : serializes the
ByteString, and copies the result out to an array before creating
ByteString out of it (in Array[Byte] serializing is thankfully simply
returning same array - so one copy only).


Given the need to send immutable data large number of times, is there
any way to do it in akka without copying internally in akka ?


To see how expensive it is, for 200 nodes withi large number of
mappers and reducers, the status becomes something like 30 mb for us -
and pulling this about 200 to 300 times results in OOM due to the
large number of copies sent out.


Thanks,
Mridul


[jira] [Commented] (SPARK-2294) TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks get assigned to an executor

2014-06-26 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045433#comment-14045433
 ] 

Mridul Muralidharan commented on SPARK-2294:


I agree; We should bump no locality pref and speculative tasks to NODE_LOCAL 
level after NODE_LOCAL tasks have been scheduled (if available), and not check 
for them at PROCESS_LOCAL max locality. So they get scheduled before RACK_LOCAL 
but after NODE_LOCAL.
This is an artifact of the design when there was no PROCESS_LOCAL and 
NODE_LOCAL was the best schedule possible (without explicitly having these 
level : we had node and any).

> TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks 
> get assigned to an executor
> ---
>
> Key: SPARK-2294
> URL: https://issues.apache.org/jira/browse/SPARK-2294
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.0, 1.0.1
>Reporter: Kay Ousterhout
>
> If an executor E is free, a task may be speculatively assigned to E when 
> there are other tasks in the job that have not been launched (at all) yet.  
> Similarly, a task without any locality preferences may be assigned to E when 
> there was another NODE_LOCAL task that could have been scheduled. 
> This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer 
> (which in turn calls TaskSetManager.findTask) with increasing locality 
> levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until 
> the highest currently allowed level.  Now, supposed NODE_LOCAL is the highest 
> currently allowed locality level.  The first time findTask is called, it will 
> be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL 
> tasks, it will try to schedule tasks with no locality preferences or 
> speculative tasks.  As a result, speculative tasks or tasks with no 
> preferences may be scheduled instead of NODE_LOCAL tasks.
> cc [~matei]



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown

2014-06-24 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14043088#comment-14043088
 ] 

Mridul Muralidharan commented on SPARK-2268:


That is not because of this hook.
There are a bunch of places in spark where filesystem objects are (incorrectly 
I should add) getting closed : some within shutdown hooks (check in stop method 
in various services in spark) and others elsewhere (like checkpointing code).

I have fixed a bunch of these as part of some other work ... should come in a 
PR soon.

> Utils.createTempDir() creates race with HDFS at shutdown
> 
>
> Key: SPARK-2268
> URL: https://issues.apache.org/jira/browse/SPARK-2268
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Marcelo Vanzin
>
> Utils.createTempDir() has this code:
> {code}
> // Add a shutdown hook to delete the temp dir when the JVM exits
> Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + 
> dir) {
>   override def run() {
> // Attempt to delete if some patch which is parent of this is not 
> already registered.
> if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
>   }
> })
> {code}
> This creates a race with the shutdown hooks registered by HDFS, since the 
> order of execution is undefined; if the HDFS hooks run first, you'll get 
> exceptions about the file system being closed.
> Instead, this should use Hadoop's ShutdownHookManager with a proper priority, 
> so that it runs before the HDFS hooks.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown

2014-06-24 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14043071#comment-14043071
 ] 

Mridul Muralidharan commented on SPARK-2268:


Setting priority for shutdown hooks does not have too much impact given the 
state of the VM.
Note that this hook is trying to delete local directories - not dfs directories.

> Utils.createTempDir() creates race with HDFS at shutdown
> 
>
> Key: SPARK-2268
> URL: https://issues.apache.org/jira/browse/SPARK-2268
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Marcelo Vanzin
>
> Utils.createTempDir() has this code:
> {code}
> // Add a shutdown hook to delete the temp dir when the JVM exits
> Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + 
> dir) {
>   override def run() {
> // Attempt to delete if some patch which is parent of this is not 
> already registered.
> if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
>   }
> })
> {code}
> This creates a race with the shutdown hooks registered by HDFS, since the 
> order of execution is undefined; if the HDFS hooks run first, you'll get 
> exceptions about the file system being closed.
> Instead, this should use Hadoop's ShutdownHookManager with a proper priority, 
> so that it runs before the HDFS hooks.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (SPARK-1476) 2GB limit in spark for blocks

2014-06-24 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-1476:
---

Attachment: 2g_fix_proposal.pdf

Proposal detailing the work we have done on this effort.
Looking forward to feedback before a PR is submitted based on this..

> 2GB limit in spark for blocks
> -
>
> Key: SPARK-1476
> URL: https://issues.apache.org/jira/browse/SPARK-1476
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
> Environment: all
>    Reporter: Mridul Muralidharan
>Assignee: Mridul Muralidharan
>Priority: Critical
> Fix For: 1.1.0
>
> Attachments: 2g_fix_proposal.pdf
>
>
> The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
> the size of the block to 2GB.
> This has implication not just for managed blocks in use, but also for shuffle 
> blocks (memory mapped blocks are limited to 2gig, even though the api allows 
> for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
> This is a severe limitation for use of spark when used on non trivial 
> datasets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: [jira] [Created] (SPARK-1867) Spark Documentation Error causes java.lang.IllegalStateException: unread block data

2014-06-23 Thread Mridul Muralidharan
There are a few interacting issues here - and unfortunately I dont
recall all of it (since this was fixed a few months back).
>From memory though :

a) With shuffle consolidation, data sent to remote node incorrectly
includes data from partially constructed blocks - not just the request
blocks.
Actually, with shuffle consolidation (with and without failures) quite
a few things broke.

b) There might have been a few other bugs in DiskBlockObjectWriter too.

c) We also suspected buffers overlapping when using cached kryo
serializer (though never proved this, just disabled caching across
board for now : and always create new instance).

The way we debug'ed it is by introducing an Input/Output stream which
introduced checksum into the data stream and validating that at each
side for compression, serialization, etc.

Apologies for being non specific ... I really dont have the details
right now, and our internal branch is in flux due to merge effort to
port our local changes to master.
Hopefully we will be able to submit PR's as soon as this is done and
testcases are added to validate.


Regards,
Mridul




On Tue, Jun 24, 2014 at 10:21 AM, Reynold Xin  wrote:
> Mridul,
>
> Can you comment a little bit more on this issue? We are running into the
> same stack trace but not sure whether it is just different Spark versions
> on each cluster (doesn't seem likely) or a bug in Spark.
>
> Thanks.
>
>
>
> On Sat, May 17, 2014 at 4:41 AM, Mridul Muralidharan 
> wrote:
>
>> I suspect this is an issue we have fixed internally here as part of a
>> larger change - the issue we fixed was not a config issue but bugs in
>> spark.
>>
>> Unfortunately we plan to contribute this as part of 1.1
>>
>> Regards,
>> Mridul
>> On 17-May-2014 4:09 pm, "sam (JIRA)"  wrote:
>>
>> > sam created SPARK-1867:
>> > --
>> >
>> >  Summary: Spark Documentation Error causes
>> > java.lang.IllegalStateException: unread block data
>> >  Key: SPARK-1867
>> >  URL: https://issues.apache.org/jira/browse/SPARK-1867
>> >  Project: Spark
>> >   Issue Type: Bug
>> > Reporter: sam
>> >
>> >
>> > I've employed two System Administrators on a contract basis (for quite a
>> > bit of money), and both contractors have independently hit the following
>> > exception.  What we are doing is:
>> >
>> > 1. Installing Spark 0.9.1 according to the documentation on the website,
>> > along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs.
>> > 2. Building a fat jar with a Spark app with sbt then trying to run it on
>> > the cluster
>> >
>> > I've also included code snippets, and sbt deps at the bottom.
>> >
>> > When I've Googled this, there seems to be two somewhat vague responses:
>> > a) Mismatching spark versions on nodes/user code
>> > b) Need to add more jars to the SparkConf
>> >
>> > Now I know that (b) is not the problem having successfully run the same
>> > code on other clusters while only including one jar (it's a fat jar).
>> >
>> > But I have no idea how to check for (a) - it appears Spark doesn't have
>> > any version checks or anything - it would be nice if it checked versions
>> > and threw a "mismatching version exception: you have user code using
>> > version X and node Y has version Z".
>> >
>> > I would be very grateful for advice on this.
>> >
>> > The exception:
>> >
>> > Exception in thread "main" org.apache.spark.SparkException: Job aborted:
>> > Task 0.0:1 failed 32 times (most recent failure: Exception failure:
>> > java.lang.IllegalStateException: unread block data)
>> > at
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
>> > at
>> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
>> > at
>> >
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at
>> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> > at org.apache.spark.scheduler.DAGScheduler.org
>> > $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
>> > at
>> >
>> org.apache.spark.scheduler.DAGScheduler$

[jira] [Commented] (SPARK-2223) Building and running tests with maven is extremely slow

2014-06-22 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14040094#comment-14040094
 ] 

Mridul Muralidharan commented on SPARK-2223:


I usually do :
$ mvn -Pyarn-alpha clean; mvn -Pyarn-alpha -DskipTests=true install; mvn 
-Pyarn-alpha install

To ensure no older artifacts can even remotely affect the new build/test :-) 
(the -P since I then usually ship assembled jar to our cluster)
Takes about 1.5 hours or so on slightly older lenovo (primarily due to hive 
tests iirc).

> Building and running tests with maven is extremely slow
> ---
>
> Key: SPARK-2223
> URL: https://issues.apache.org/jira/browse/SPARK-2223
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.0.0
>Reporter: Thomas Graves
>
> For some reason using maven with Spark is extremely slow.  Building and 
> running tests takes way longer then other projects I have used that use 
> maven.  We should investigate to see why.  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


<    4   5   6   7   8   9   10   11   12   13   >