[jira] [Commented] (SPARK-3889) JVM dies with SIGBUS, resulting in ConnectionManager failed ACK

2014-10-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14167303#comment-14167303
 ] 

Mridul Muralidharan commented on SPARK-3889:


The status says fixed - what was done to resolve this ? I did not see a PR ...

 JVM dies with SIGBUS, resulting in ConnectionManager failed ACK
 ---

 Key: SPARK-3889
 URL: https://issues.apache.org/jira/browse/SPARK-3889
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Aaron Davidson
Assignee: Aaron Davidson
Priority: Critical
 Fix For: 1.2.0


 Here's the first part of the core dump, possibly caused by a job which 
 shuffles a lot of very small partitions.
 {code}
 #
 # A fatal error has been detected by the Java Runtime Environment:
 #
 #  SIGBUS (0x7) at pc=0x7fa5885fcdb0, pid=488, tid=140343502632704
 #
 # JRE version: 7.0_25-b30
 # Java VM: OpenJDK 64-Bit Server VM (23.7-b01 mixed mode linux-amd64 
 compressed oops)
 # Problematic frame:
 # v  ~StubRoutines::jbyte_disjoint_arraycopy
 #
 # Failed to write core dump. Core dumps have been disabled. To enable core 
 dumping, try ulimit -c unlimited before starting Java again
 #
 # If you would like to submit a bug report, please include
 # instructions on how to reproduce the bug and visit:
 #   https://bugs.launchpad.net/ubuntu/+source/openjdk-7/
 #
 ---  T H R E A D  ---
 Current thread (0x7fa4b0631000):  JavaThread Executor task launch 
 worker-170 daemon [_thread_in_Java, id=6783, 
 stack(0x7fa4448ef000,0x7fa4449f)]
 siginfo:si_signo=SIGBUS: si_errno=0, si_code=2 (BUS_ADRERR), 
 si_addr=0x7fa428f79000
 {code}
 Here is the only useful content I can find related to JVM and SIGBUS from 
 Google: https://bugzilla.redhat.com/show_bug.cgi?format=multipleid=976664
 It appears it may be related to disposing byte buffers, which we do in the 
 ConnectionManager -- we mmap shuffle files via ManagedBuffer and dispose of 
 them in BufferMessage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3785) Support off-loading computations to a GPU

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163326#comment-14163326
 ] 

Mridul Muralidharan commented on SPARK-3785:


[~sowen] We had prototyped a solution for doing just this - the way we did it 
was to add a new StorageLevel - which was higher than ProcessLevel - and 
maintain information about which card hosted the data in an executor, and other 
handles/metadata required to offload computation to the accelerator card (so 
not just gpu).
And we set appropriate level delays so that in case the rdd had gpu data, no 
other computation level is allowed (unless there is a loss of executor).

It was a prototype - so we did not solve all issues which arise - including how 
to expose eviction of data from gpu back to main memory/disk in case of memory 
pressure, more efficient failure modes, moving data between gpu's to balance 
the memory and computational load/rdd replication between gpus in an executor, 
multi-tennancy, etc : our initial target usecases just required running various 
(expensive) closures on the data and the result was to be pulled off the card 
(which was fairly 'small') - so not all of these needed to be solved alteast 
for the prototype :-)
I cant get into the gory details or the actual benchmark numbers though, 
apologies.

In general, is it worth it ? For very specific cases, I would say it is a 
phenomenal - allowing 2 - 3 orders of performance boost vertically !
But for cases where it is not a good fit, it is terrible - even for cases where 
it was intutively supposed to work, inefficiencies we incur make it terrible at 
times.

 Support off-loading computations to a GPU
 -

 Key: SPARK-3785
 URL: https://issues.apache.org/jira/browse/SPARK-3785
 Project: Spark
  Issue Type: Brainstorming
  Components: MLlib
Reporter: Thomas Darimont
Priority: Minor

 Are there any plans to adding support for off-loading computations to the 
 GPU, e.g. via an open-cl binding? 
 http://www.jocl.org/
 https://code.google.com/p/javacl/
 http://lwjgl.org/wiki/index.php?title=OpenCL_in_LWJGL



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163351#comment-14163351
 ] 

Mridul Muralidharan commented on SPARK-3561:


[~pwendell] If I understood the proposal and the initial pr submitted - the 
intent of this JIRA, as initally proposed by [~ozhurakousky] is fairly 
different from the other efforts referenced if I am not wrong.
The focus of this change seems to be to completely bypass spark execution 
engine and substitute an alternative : so only the current api (and so dag 
creation from the spark program) and user interfaces in spark remain - the 
block management, execution engine, execution state management, etc would all 
be replaced under the covers by what Tez (or something else in future) provides.

If I am not wrong the changes would be :
a) Applies only to yarn mode - when specified execution environment can be run.
b) the current spark AM would no longer request for any executors.
c) spark block manager would no longer be required (other than possibly for 
hosting broadcast via http i guess ?).
d) the actual DAG execution would be taken up by the overridden execution 
engine - spark's Task manager and DAG scheduler are noop's.

I might be missing things which Oleg can elaborate on.


This functionality, IMO, is fundamentally different from what is being explored 
in the other jira's - and so has value to be pursued independent of the other 
efforts.
Obviously this does not work in all usecases where spark is run on - but 
handles a subset of usecases where other execution engines might do much better 
than spark currently does - simply because of better code maturity and 
specialized usecases they target.


 Allow for pluggable execution contexts in Spark
 ---

 Key: SPARK-3561
 URL: https://issues.apache.org/jira/browse/SPARK-3561
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Oleg Zhurakousky
  Labels: features
 Fix For: 1.2.0

 Attachments: SPARK-3561.pdf


 Currently Spark provides integration with external resource-managers such as 
 Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the 
 current architecture of Spark-on-YARN can be enhanced to provide 
 significantly better utilization of cluster resources for large scale, batch 
 and/or ETL applications when run alongside other applications (Spark and 
 others) and services in YARN. 
 Proposal: 
 The proposed approach would introduce a pluggable JobExecutionContext (trait) 
 - a gateway and a delegate to Hadoop execution environment - as a non-public 
 api (@DeveloperAPI) not exposed to end users of Spark. 
 The trait will define 4 only operations: 
 * hadoopFile 
 * newAPIHadoopFile 
 * broadcast 
 * runJob 
 Each method directly maps to the corresponding methods in current version of 
 SparkContext. JobExecutionContext implementation will be accessed by 
 SparkContext via master URL as 
 execution-context:foo.bar.MyJobExecutionContext with default implementation 
 containing the existing code from SparkContext, thus allowing current 
 (corresponding) methods of SparkContext to delegate to such implementation. 
 An integrator will now have an option to provide custom implementation of 
 DefaultExecutionContext by either implementing it from scratch or extending 
 form DefaultExecutionContext. 
 Please see the attached design doc for more details. 
 Pull Request will be posted shortly as well



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163376#comment-14163376
 ] 

Mridul Muralidharan commented on SPARK-3561:


[~ozhurakousky] I think the disconnect here is that the interfaces proposed do 
not show much value in what is supposed to be the functionality to be exposed 
to the users. A followup pr showing how this interfaces are used in context of 
Tez would show value in why this change is relevant in context of spark.

The disconnect, if I am not wrong, is that we do not want to expose spi's which 
we would then need to maintain in spark core - while unknown implementations 
extend it in non standard ways causing issues to our end users.


For example, even though TaskScheduler is an spi and can in theory be extended 
in arbitrary ways - all the spi implementations currently 'live' within spark 
and are in harmony with rest of the code - and changes which occur within spark 
core (when functionality is added or extended).
This allows us to decouple the actual TaskScheduler implementation from spark 
code, while still keeping them in sync and maintainable while adding 
functionality independent of other pieces : case in point, yarn support has 
significantly evolved from when I initially added it - to the point where it 
probably does not share even a single line of code I initially wrote :) - and 
yet this has been done pretty much independent of changes to core while at the 
same time ensuring that it is compatible with changes in spark core and vice 
versa.


The next step, imo, would be a PR which shows how these interfaces are used for 
non trivial usecase : Tez in this case.
The default implementation provided in the pr can be removed (since it should 
not be used/exposed to users).

Once that is done, we can evaluate the interface proposed in context of the 
functionality exposed, and see how it fits in context of rest of spark.

 Allow for pluggable execution contexts in Spark
 ---

 Key: SPARK-3561
 URL: https://issues.apache.org/jira/browse/SPARK-3561
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Oleg Zhurakousky
  Labels: features
 Fix For: 1.2.0

 Attachments: SPARK-3561.pdf


 Currently Spark provides integration with external resource-managers such as 
 Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the 
 current architecture of Spark-on-YARN can be enhanced to provide 
 significantly better utilization of cluster resources for large scale, batch 
 and/or ETL applications when run alongside other applications (Spark and 
 others) and services in YARN. 
 Proposal: 
 The proposed approach would introduce a pluggable JobExecutionContext (trait) 
 - a gateway and a delegate to Hadoop execution environment - as a non-public 
 api (@DeveloperAPI) not exposed to end users of Spark. 
 The trait will define 4 only operations: 
 * hadoopFile 
 * newAPIHadoopFile 
 * broadcast 
 * runJob 
 Each method directly maps to the corresponding methods in current version of 
 SparkContext. JobExecutionContext implementation will be accessed by 
 SparkContext via master URL as 
 execution-context:foo.bar.MyJobExecutionContext with default implementation 
 containing the existing code from SparkContext, thus allowing current 
 (corresponding) methods of SparkContext to delegate to such implementation. 
 An integrator will now have an option to provide custom implementation of 
 DefaultExecutionContext by either implementing it from scratch or extending 
 form DefaultExecutionContext. 
 Please see the attached design doc for more details. 
 Pull Request will be posted shortly as well



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3847) Enum.hashCode is only consistent within the same JVM

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163407#comment-14163407
 ] 

Mridul Muralidharan commented on SPARK-3847:


Wow, nice bug ! This is unexpected - thanks for reporting this.

 Enum.hashCode is only consistent within the same JVM
 

 Key: SPARK-3847
 URL: https://issues.apache.org/jira/browse/SPARK-3847
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: Oracle JDK 7u51 64bit on Ubuntu 12.04
Reporter: Nathan Bijnens
  Labels: enum

 When using java Enum's as key in some operations the results will be very 
 unexpected. The issue is that the Java Enum.hashCode returns the 
 memoryposition, which is different on each JVM. 
 {code}
 messages.filter(_.getHeader.getKind == Kind.EVENT).count
  503650
 val tmp = messages.filter(_.getHeader.getKind == Kind.EVENT)
 tmp.map(_.getHeader.getKind).countByValue
  Map(EVENT - 1389)
 {code}
 Because it's actually a JVM issue we either should reject with an error enums 
 as key or implement a workaround.
 A good writeup of the issue can be found here (and a workaround):
 http://dev.bizo.com/2014/02/beware-enums-in-spark.html
 Somewhat more on the hash codes and Enum's:
 https://stackoverflow.com/questions/4885095/what-is-the-reason-behind-enum-hashcode
 And some issues (most of them rejected) at the Oracle Bug Java database:
 - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8050217
 - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7190798



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14164095#comment-14164095
 ] 

Mridul Muralidharan commented on SPARK-3561:



I agree with [~pwendell] that it does not help spark to introduce dependency of 
tez on core.
[~ozhurakousky] is tez available on all yarn clusters ? Or is it an additional 
runtime dependency ?

If it is available by default - we can make it a runtime switch to use tez for 
jobs running on yarn-standalone and yarn-client mode.
But before that ...


While better multi-tennancy would be a likely benefit - my specific interest in 
this patch is more to do with the much better shuffle performance that tez 
offers :-) Specicially for ETL jobs, I can see other benefits which might be 
relevant - one of our collaborative filtering implementation, though not ETL, 
comes fairly close to it in job characterstics and suffers due to some of our 
shuffle issues ...


As I alluded to, I do not think we should have an openended extension point - 
where any class name can be provided which extends functionality in arbitrary 
manner - for example, like the spi we have for compression codecs.
As Patrick mentioned, this gives the impression that the approach is blessed by 
spark developers - even if tagged with Experimental.
Particularly with core internals, I would be very wary of exposing them via an 
spi - simply because we need the freedom to evolve them for performance or 
functionality reasons.


On other hand, I am in favour of exploring this option to see what sort of 
benefits we get out of this assuming it has been prototyped already - which I 
thought was the case here, though I am yet to see a PR with that (not sure if I 
missed it !).
Given that Tez is supposed to be reasonably mature - if there is a spark + tez 
version, I want to see what benefits (if any) are observed as a result of this 
effort.
I had discussed spark + tez integration about an year or so back with Matei - 
but at that time, tez was probably not that mature - maybe this is a better 
time !

[~ozhurakousky] Do you have a spark on tez prototype done already ? Or is this 
an experiment you are yet to complete ? If complete, what sort of performance 
difference do you see ? What metrics are you using ?


If there are significant benefits, I would want to take a closer look at the 
final proposed patch ... I would be interested in it making into spark in some 
form.

As [~nchammas] mentioned - if it is possible to address it in spark directly, 
nothing like it - particularly since it will benefit all modes of execution and 
not just yarn + tez combination.
If the gap cant be narrowed, and the benefits are significant (for some, as of 
now underfined, definition of benefits and significant) - then we can 
consider tez dependency in yarn module.

Ofcourse, all these questions are moot - until we have better quantitative 
judgement of what the expected gains are and what the experimental results are.

 Allow for pluggable execution contexts in Spark
 ---

 Key: SPARK-3561
 URL: https://issues.apache.org/jira/browse/SPARK-3561
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Oleg Zhurakousky
  Labels: features
 Fix For: 1.2.0

 Attachments: SPARK-3561.pdf


 Currently Spark provides integration with external resource-managers such as 
 Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the 
 current architecture of Spark-on-YARN can be enhanced to provide 
 significantly better utilization of cluster resources for large scale, batch 
 and/or ETL applications when run alongside other applications (Spark and 
 others) and services in YARN. 
 Proposal: 
 The proposed approach would introduce a pluggable JobExecutionContext (trait) 
 - a gateway and a delegate to Hadoop execution environment - as a non-public 
 api (@DeveloperAPI) not exposed to end users of Spark. 
 The trait will define 4 only operations: 
 * hadoopFile 
 * newAPIHadoopFile 
 * broadcast 
 * runJob 
 Each method directly maps to the corresponding methods in current version of 
 SparkContext. JobExecutionContext implementation will be accessed by 
 SparkContext via master URL as 
 execution-context:foo.bar.MyJobExecutionContext with default implementation 
 containing the existing code from SparkContext, thus allowing current 
 (corresponding) methods of SparkContext to delegate to such implementation. 
 An integrator will now have an option to provide custom implementation of 
 DefaultExecutionContext by either implementing it from scratch or extending 
 form DefaultExecutionContext. 
 Please see the attached design doc for more details. 
 Pull Request will be posted shortly as well



--
This message was 

[jira] [Commented] (SPARK-3847) Enum.hashCode is only consistent within the same JVM

2014-10-08 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14164775#comment-14164775
 ] 

Mridul Muralidharan commented on SPARK-3847:


[~joshrosen] array hashcode might be understandable - but enum's messing 
hashcode up is unexpected to say the least :-)
You are right, we need to add something similar.

Maybe have a blacklist of key types which are unstable to be used in 
distributed setting - I have a feeling we might end up with a longer list than 
just array and enum.

 Enum.hashCode is only consistent within the same JVM
 

 Key: SPARK-3847
 URL: https://issues.apache.org/jira/browse/SPARK-3847
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: Oracle JDK 7u51 64bit on Ubuntu 12.04
Reporter: Nathan Bijnens
  Labels: enum

 When using java Enum's as key in some operations the results will be very 
 unexpected. The issue is that the Java Enum.hashCode returns the 
 memoryposition, which is different on each JVM. 
 {code}
 messages.filter(_.getHeader.getKind == Kind.EVENT).count
  503650
 val tmp = messages.filter(_.getHeader.getKind == Kind.EVENT)
 tmp.map(_.getHeader.getKind).countByValue
  Map(EVENT - 1389)
 {code}
 Because it's actually a JVM issue we either should reject with an error enums 
 as key or implement a workaround.
 A good writeup of the issue can be found here (and a workaround):
 http://dev.bizo.com/2014/02/beware-enums-in-spark.html
 Somewhat more on the hash codes and Enum's:
 https://stackoverflow.com/questions/4885095/what-is-the-reason-behind-enum-hashcode
 And some issues (most of them rejected) at the Oracle Bug Java database:
 - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8050217
 - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7190798



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3714) Spark workflow scheduler

2014-09-29 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14151486#comment-14151486
 ] 

Mridul Muralidharan commented on SPARK-3714:



Most of the drawbacks mentioned are not severe imo - at best, they are 
unfamiliarity with oozie platform (points 2, 3, 4, 5).
Point 1 is interesting (sharing spark context) - though from a fault tolerance 
point of view, it makes supporting it challenging; ofcourse oozie was not, 
probably, designed with something like spark in mind - so there might be 
changes to oozie which might benefit spark; we could engage with oozie dev for 
that.

But discarding it to reinvent something when oozie already does everything 
mentioned in requirements section seems counterintutive.


I have seen multiple attempts to 'simplify' workflow management, and at 
production scale almost everything ends up being similar ...
Note that most production jobs have to depend on a variety of jobs - not just 
spark or MR - so you will end up converigng on a variant of oozie anyway :-)

Having said that, if you want to take a crack at solving this with spark 
specific idioms in mind, it would be interesting to see the result - I dont 
want to dissuade from doing so !
We might end up with something quite interesting.

 Spark workflow scheduler
 

 Key: SPARK-3714
 URL: https://issues.apache.org/jira/browse/SPARK-3714
 Project: Spark
  Issue Type: New Feature
  Components: Project Infra
Reporter: Egor Pakhomov
Priority: Minor

 [Design doc | 
 https://docs.google.com/document/d/1q2Q8Ux-6uAkH7wtLJpc3jz-GfrDEjlbWlXtf20hvguk/edit?usp=sharing]
 Spark stack currently hard to use in the production processes due to the lack 
 of next features:
 * Scheduling spark jobs
 * Retrying failed spark job in big pipeline
 * Share context among jobs in pipeline
 * Queue jobs
 Typical usecase for such platform would be - wait for new data, process new 
 data, learn ML models on new data, compare model with previous one, in case 
 of success - rewrite model in HDFS directory for current production model 
 with new one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3714) Spark workflow scheduler

2014-09-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14151211#comment-14151211
 ] 

Mridul Muralidharan commented on SPARK-3714:


Have you tried using oozie for this ?
IIRC Tom has already gotten this working here quite a while back
/CC [~tgraves] 

 Spark workflow scheduler
 

 Key: SPARK-3714
 URL: https://issues.apache.org/jira/browse/SPARK-3714
 Project: Spark
  Issue Type: New Feature
  Components: Project Infra
Reporter: Egor Pakhomov
Priority: Minor

 [Design doc | 
 https://docs.google.com/document/d/1q2Q8Ux-6uAkH7wtLJpc3jz-GfrDEjlbWlXtf20hvguk/edit?usp=sharing]
 Spark stack currently hard to use in the production processes due to the lack 
 of next features:
 * Scheduling spark jobs
 * Retrying failed spark job in big pipeline
 * Share context among jobs in pipeline
 * Queue jobs
 Typical usecase for such platform would be - wait for new data, process new 
 data, learn ML models on new data, compare model with previous one, in case 
 of success - rewrite model in HDFS directory for current production model 
 with new one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1956) Enable shuffle consolidation by default

2014-09-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14125027#comment-14125027
 ] 

Mridul Muralidharan commented on SPARK-1956:


The recent changes to BlockObjectWriter have introduced bugs again ... I don't 
know how badly they affect the codebase, but it would not be prudent to enable 
it by default until they are fixed and changes properly analyzed.

 Enable shuffle consolidation by default
 ---

 Key: SPARK-1956
 URL: https://issues.apache.org/jira/browse/SPARK-1956
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.0.0
Reporter: Sandy Ryza

 The only drawbacks are on ext3, and most everyone has ext4 at this point.  I 
 think it's better to aim the default at the common case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-09-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119362#comment-14119362
 ] 

Mridul Muralidharan commented on SPARK-3019:


Just went over the proposal in some detail.
[~rxin] did you take a look at the proposal in SPARK-1476 ?
The ManagedBuffer detailed in this document does not satisfy most of the 
interface or functional requirements in 1476 - which would require us to 
redesign this interface when we need to support larger than 2 GB for blocks in 
spark : unless I missed something.

 Pluggable block transfer (data plane communication) interface
 -

 Key: SPARK-3019
 URL: https://issues.apache.org/jira/browse/SPARK-3019
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
 Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
 1.pdf


 The attached design doc proposes a standard interface for block transferring, 
 which will make future engineering of this functionality easier, allowing the 
 Spark community to provide alternative implementations.
 Block transferring is a critical function in Spark. All of the following 
 depend on it:
 * shuffle
 * torrent broadcast
 * block replication in BlockManager
 * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-09-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119365#comment-14119365
 ] 

Mridul Muralidharan commented on SPARK-3019:


I will try to push the version we had last worked on for 2G fix (a pre-1.1 
fork) to git later today/this week - and we can take a look at it.
It might require some effort to rebase it to 1.1 since it is slightly dated; 
but that can be done if required : the main reason for the push would be to 
illustrate the reason why the interfaces exist in SPARK-1476 and how they are 
used : so that there is a better understanding of the required functional 
change.

 Pluggable block transfer (data plane communication) interface
 -

 Key: SPARK-3019
 URL: https://issues.apache.org/jira/browse/SPARK-3019
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
 Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
 1.pdf


 The attached design doc proposes a standard interface for block transferring, 
 which will make future engineering of this functionality easier, allowing the 
 Spark community to provide alternative implementations.
 Block transferring is a critical function in Spark. All of the following 
 depend on it:
 * shuffle
 * torrent broadcast
 * block replication in BlockManager
 * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks

2014-09-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119385#comment-14119385
 ] 

Mridul Muralidharan commented on SPARK-1476:


WIP version pushed to https://github.com/mridulm/spark/tree/2g_fix - about 2 
weeks before feature freeze in 1.1 iirc. 

Note that the 2g fixes are functionally complete, but this branch also includes 
a large number of other fixes.
Some of these have been pushed to master; while others have not yet done : for 
alleviating memory pressure primarily, and fixing resource leaks.

This branch has been shared for reference purpose - and is not meant to be 
actively worked on for merging into master.
We will need to cherry pick the changes and do that manually.

 2GB limit in spark for blocks
 -

 Key: SPARK-1476
 URL: https://issues.apache.org/jira/browse/SPARK-1476
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
 Environment: all
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
Priority: Critical
 Attachments: 2g_fix_proposal.pdf


 The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
 the size of the block to 2GB.
 This has implication not just for managed blocks in use, but also for shuffle 
 blocks (memory mapped blocks are limited to 2gig, even though the api allows 
 for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
 This is a severe limitation for use of spark when used on non trivial 
 datasets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14113741#comment-14113741
 ] 

Mridul Muralidharan commented on SPARK-3277:


This looks like unrelated changes pushed to BlockObjectWriter as part of 
introduction of ShuffleWriteMetrics.
I had introducing checks and also documented that we must not infer size based 
on position of stream after flush - since close can write data to the streams 
(and one flush can result in more data getting generated which need not be 
flushed to streams).

Apparently this logic was modified subsequently causing this bug.
Solution would be to revert changes to update shuffleBytesWritten before close 
of stream.
It must be done after close and based on file.length

 LZ4 compression cause the the ExternalSort exception
 

 Key: SPARK-3277
 URL: https://issues.apache.org/jira/browse/SPARK-3277
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2
Reporter: hzw
 Fix For: 1.1.0


 I tested the LZ4 compression,and it come up with such problem.(with wordcount)
 Also I tested the snappy and LZF,and they were OK.
 At last I set the  spark.shuffle.spill as false to avoid such exeception, 
 but once open this switch, this error would come.
 Exeception Info as follow:
 java.lang.AssertionError: assertion failed
 at scala.Predef$.assert(Predef.scala:165)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at 
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-3277:
---

Priority: Blocker  (was: Major)

 LZ4 compression cause the the ExternalSort exception
 

 Key: SPARK-3277
 URL: https://issues.apache.org/jira/browse/SPARK-3277
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: hzw
Priority: Blocker
 Fix For: 1.1.0


 I tested the LZ4 compression,and it come up with such problem.(with wordcount)
 Also I tested the snappy and LZF,and they were OK.
 At last I set the  spark.shuffle.spill as false to avoid such exeception, 
 but once open this switch, this error would come.
 It seems that if num of the words is few, wordcount will go through,but if it 
 is a complex text ,this problem will show
 Exeception Info as follow:
 java.lang.AssertionError: assertion failed
 at scala.Predef$.assert(Predef.scala:165)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at 
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-3277:
---

Affects Version/s: 1.2.0
   1.1.0

 LZ4 compression cause the the ExternalSort exception
 

 Key: SPARK-3277
 URL: https://issues.apache.org/jira/browse/SPARK-3277
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: hzw
Priority: Blocker
 Fix For: 1.1.0


 I tested the LZ4 compression,and it come up with such problem.(with wordcount)
 Also I tested the snappy and LZF,and they were OK.
 At last I set the  spark.shuffle.spill as false to avoid such exeception, 
 but once open this switch, this error would come.
 It seems that if num of the words is few, wordcount will go through,but if it 
 is a complex text ,this problem will show
 Exeception Info as follow:
 java.lang.AssertionError: assertion failed
 at scala.Predef$.assert(Predef.scala:165)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at 
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114014#comment-14114014
 ] 

Mridul Muralidharan commented on SPARK-3277:


[~matei] Attaching a patch which reproduces the bug consistently.
I suspect the issue is more serious than what I detailed above - spill to disk 
seems completely broken if I understood the assertion message correctly.
Unfortunately, this is based on a few minutes of free time I could grab - so a 
more principled debugging session is definitely warranted !



 LZ4 compression cause the the ExternalSort exception
 

 Key: SPARK-3277
 URL: https://issues.apache.org/jira/browse/SPARK-3277
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: hzw
Priority: Blocker
 Fix For: 1.1.0


 I tested the LZ4 compression,and it come up with such problem.(with wordcount)
 Also I tested the snappy and LZF,and they were OK.
 At last I set the  spark.shuffle.spill as false to avoid such exeception, 
 but once open this switch, this error would come.
 It seems that if num of the words is few, wordcount will go through,but if it 
 is a complex text ,this problem will show
 Exeception Info as follow:
 java.lang.AssertionError: assertion failed
 at scala.Predef$.assert(Predef.scala:165)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at 
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114022#comment-14114022
 ] 

Mridul Muralidharan edited comment on SPARK-3277 at 8/28/14 5:37 PM:
-

Attached patch is against master, though I noticed similar changes in 1.1 also 
: but not yet verified.


was (Author: mridulm80):
Against master, though I noticed similar changes in 1.1 also : but not yet 
verified.

 LZ4 compression cause the the ExternalSort exception
 

 Key: SPARK-3277
 URL: https://issues.apache.org/jira/browse/SPARK-3277
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: hzw
Priority: Blocker
 Fix For: 1.1.0

 Attachments: test_lz4_bug.patch


 I tested the LZ4 compression,and it come up with such problem.(with wordcount)
 Also I tested the snappy and LZF,and they were OK.
 At last I set the  spark.shuffle.spill as false to avoid such exeception, 
 but once open this switch, this error would come.
 It seems that if num of the words is few, wordcount will go through,but if it 
 is a complex text ,this problem will show
 Exeception Info as follow:
 java.lang.AssertionError: assertion failed
 at scala.Predef$.assert(Predef.scala:165)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at 
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-3277:
---

Attachment: test_lz4_bug.patch

Against master, though I noticed similar changes in 1.1 also : but not yet 
verified.

 LZ4 compression cause the the ExternalSort exception
 

 Key: SPARK-3277
 URL: https://issues.apache.org/jira/browse/SPARK-3277
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: hzw
Priority: Blocker
 Fix For: 1.1.0

 Attachments: test_lz4_bug.patch


 I tested the LZ4 compression,and it come up with such problem.(with wordcount)
 Also I tested the snappy and LZF,and they were OK.
 At last I set the  spark.shuffle.spill as false to avoid such exeception, 
 but once open this switch, this error would come.
 It seems that if num of the words is few, wordcount will go through,but if it 
 is a complex text ,this problem will show
 Exeception Info as follow:
 java.lang.AssertionError: assertion failed
 at scala.Predef$.assert(Predef.scala:165)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at 
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114026#comment-14114026
 ] 

Mridul Muralidharan commented on SPARK-3277:


[~hzw] did you notice this against 1.0.2 ?
I did not think the changes for consolidated shuffle were backported to that 
branch, [~mateiz] can comment more though.

 LZ4 compression cause the the ExternalSort exception
 

 Key: SPARK-3277
 URL: https://issues.apache.org/jira/browse/SPARK-3277
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: hzw
Priority: Blocker
 Fix For: 1.1.0

 Attachments: test_lz4_bug.patch


 I tested the LZ4 compression,and it come up with such problem.(with wordcount)
 Also I tested the snappy and LZF,and they were OK.
 At last I set the  spark.shuffle.spill as false to avoid such exeception, 
 but once open this switch, this error would come.
 It seems that if num of the words is few, wordcount will go through,but if it 
 is a complex text ,this problem will show
 Exeception Info as follow:
 java.lang.AssertionError: assertion failed
 at scala.Predef$.assert(Predef.scala:165)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at 
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception

2014-08-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114484#comment-14114484
 ] 

Mridul Muralidharan commented on SPARK-3277:


Sounds great, thx !
I suspect it is because for lzo we configure it to write block on flush 
(partial if insufficient data to fill block); but for lz4, either such config 
does not exist or we dont use that.
Resulting in flush becoming noop in case the data in current block is 
insufficientto cause a compressed block to be created - while close will force 
patial block to be written out.

Which is why the asserion lists all sizes as 0


 LZ4 compression cause the the ExternalSort exception
 

 Key: SPARK-3277
 URL: https://issues.apache.org/jira/browse/SPARK-3277
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: hzw
Assignee: Andrew Or
Priority: Blocker
 Attachments: test_lz4_bug.patch


 I tested the LZ4 compression,and it come up with such problem.(with wordcount)
 Also I tested the snappy and LZF,and they were OK.
 At last I set the  spark.shuffle.spill as false to avoid such exeception, 
 but once open this switch, this error would come.
 It seems that if num of the[ words is few, wordcount will go through,but if 
 it is a complex text ,this problem will show
 Exeception Info as follow:
 {code}
 java.lang.AssertionError: assertion failed
 at scala.Predef$.assert(Predef.scala:165)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at 
 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3175) Branch-1.1 SBT build failed for Yarn-Alpha

2014-08-23 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14107923#comment-14107923
 ] 

Mridul Muralidharan commented on SPARK-3175:


Please add more information on why they need to be out of sync.
As of now, the only way to build for yarn-alpha is to manually update pom.xml

 Branch-1.1 SBT build failed for Yarn-Alpha
 --

 Key: SPARK-3175
 URL: https://issues.apache.org/jira/browse/SPARK-3175
 Project: Spark
  Issue Type: Bug
  Components: Build
Affects Versions: 1.1.1
Reporter: Chester
  Labels: build
 Fix For: 1.1.1

   Original Estimate: 1h
  Remaining Estimate: 1h

 When trying to build yarn-alpha on branch-1.1
 ᚛ |branch-1.1|$  sbt/sbt -Pyarn-alpha -Dhadoop.version=2.0.5-alpha projects
 [info] Loading project definition from /Users/chester/projects/spark/project
 org.apache.maven.model.building.ModelBuildingException: 1 problem was 
 encountered while building the effective model for 
 org.apache.spark:spark-yarn-alpha_2.10:1.1.0
 [FATAL] Non-resolvable parent POM: Could not find artifact 
 org.apache.spark:yarn-parent_2.10:pom:1.1.0 in central ( 
 http://repo.maven.apache.org/maven2) and 'parent.relativePath' points at 
 wrong local POM @ line 20, column 11



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3115) Improve task broadcast latency for small tasks

2014-08-19 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102142#comment-14102142
 ] 

Mridul Muralidharan commented on SPARK-3115:


I had a tab open with pretty much exact same bug comments ready to be filed :-)

 Improve task broadcast latency for small tasks
 --

 Key: SPARK-3115
 URL: https://issues.apache.org/jira/browse/SPARK-3115
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Shivaram Venkataraman
Assignee: Reynold Xin

 Broadcasting the task information helps reduce the amount of data transferred 
 for large tasks. However we've seen that this adds more latency for small 
 tasks. It'll be great to profile and fix this.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-08-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099909#comment-14099909
 ] 

Mridul Muralidharan commented on SPARK-3019:



I am yet to go through the proposal in detail so will defer comments on that 
for later; but to get some clarity on discussion around Sandy's point :

- Until we read from all mappers, shuffle cant actually start.
Even if a single mapper's output is small enough to fit into memory (which it 
need not); num_mappers * avg_size_of_map_output_per_reducer could be way larger 
than available memory by orders. (This is fairly common for us for example).
This was the reason we actually worked on 2G fix btw - individual blocks in a 
mapper and also the data per reducer for a mapper was larger than 2G :-)

- While reading data off network, we cannot make an assessment if the read data 
can fit into memory or not (since there are other parallel read requests 
pending for this and other cores in the same executor).
So spooling intermediate data to disk would become necessary at both mapper 
side (which it already does) and at reducer side (which we dont do currently - 
assume that a block can fit into reducer memory as part of doing a remote 
fetch). This becomes more relevant when we want to target bigger blocks of data 
and tackle skew in data (for shuffle)

 Pluggable block transfer (data plane communication) interface
 -

 Key: SPARK-3019
 URL: https://issues.apache.org/jira/browse/SPARK-3019
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
 Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
 1.pdf


 The attached design doc proposes a standard interface for block transferring, 
 which will make future engineering of this functionality easier, allowing the 
 Spark community to provide alternative implementations.
 Block transferring is a critical function in Spark. All of the following 
 depend on it:
 * shuffle
 * torrent broadcast
 * block replication in BlockManager
 * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface

2014-08-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099910#comment-14099910
 ] 

Mridul Muralidharan commented on SPARK-3019:


Btw, can we do something about block replication when replication factor  1 ?
Currently we silently loose replicas; and the block placement strategy for 
replication is fairly non existant.

 Pluggable block transfer (data plane communication) interface
 -

 Key: SPARK-3019
 URL: https://issues.apache.org/jira/browse/SPARK-3019
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
 Attachments: PluggableBlockTransferServiceProposalforSpark - draft 
 1.pdf


 The attached design doc proposes a standard interface for block transferring, 
 which will make future engineering of this functionality easier, allowing the 
 Spark community to provide alternative implementations.
 Block transferring is a critical function in Spark. All of the following 
 depend on it:
 * shuffle
 * torrent broadcast
 * block replication in BlockManager
 * remote block reads for tasks scheduled without locality



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks

2014-08-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099072#comment-14099072
 ] 

Mridul Muralidharan commented on SPARK-1476:


Based on discussions we had with others, apparently 1.1 was not a good vehicle 
for this proposal.
Further, since there was no interest in this jira/comments on the proposal, we 
put the effort on the backburner.

We plan to push atleast some of the bugs fixed as part of this effort - 
consolidated shuffle did get resolved in 1.1 and probably a few more might be 
contributed back in 1.2 time permitting (disk backed map output tracking for 
example looks like a good candidate).
But bulk of the change is pervasive and at times a bit invasive and at odds 
with some of the other changes (for example, zero-copy); shepherding it might 
be a bit time consuming for me given other deliverables.

If there is renewed interest in this to get it integrated into a spark release, 
I can try to push for it to be resurrected and submitted.

 2GB limit in spark for blocks
 -

 Key: SPARK-1476
 URL: https://issues.apache.org/jira/browse/SPARK-1476
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
 Environment: all
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
Priority: Critical
 Attachments: 2g_fix_proposal.pdf


 The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
 the size of the block to 2GB.
 This has implication not just for managed blocks in use, but also for shuffle 
 blocks (memory mapped blocks are limited to 2gig, even though the api allows 
 for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
 This is a severe limitation for use of spark when used on non trivial 
 datasets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-08-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099115#comment-14099115
 ] 

Mridul Muralidharan commented on SPARK-2089:



For a general case, wont InputFormat's not have customizations to them for 
creation and/or initialization before they can be used to get splits ? (other 
than file names I mean).


 With YARN, preferredNodeLocalityData isn't honored 
 ---

 Key: SPARK-2089
 URL: https://issues.apache.org/jira/browse/SPARK-2089
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical

 When running in YARN cluster mode, apps can pass preferred locality data when 
 constructing a Spark context that will dictate where to request executor 
 containers.
 This is currently broken because of a race condition.  The Spark-YARN code 
 runs the user class and waits for it to start up a SparkContext.  During its 
 initialization, the SparkContext will create a YarnClusterScheduler, which 
 notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
 immediately fetches the preferredNodeLocationData from the SparkContext and 
 uses it to start requesting containers.
 But in the SparkContext constructor that takes the preferredNodeLocationData, 
 setting preferredNodeLocationData comes after the rest of the initialization, 
 so, if the Spark-YARN code comes around quickly enough after being notified, 
 the data that's fetched is the empty unset version.  The occurred during all 
 of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-08-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14095247#comment-14095247
 ] 

Mridul Muralidharan commented on SPARK-2089:


Since I am not maintaining the code anymore, I dont have strong preference 
either way.
I am not sure what the format means btw - I see multiple nodes and racks 
mentioned in the same group ...

In general though, I am not convinced it is a good direction to take.
1) It is a workaround for a design issue and has non trivial performance 
implications (serializing into this form to immediately deserialize it is 
expensive for large inputs : not to mention, it gets shipped to executors for 
no reason).
2) It locks us into a format which provides inadequate information - number of 
blocks per node, size per block, etc is lost (or maybe I just did not 
understand what the format is !).
3) We are currently investigating evolving in the opposite direction - add more 
information so that we can be more specific about where to allocate executors.
For example: I can see the fairly near term need to associate executors with 
accelerator cards (and break the OFF_HEAP - tachyon implicit assumption).
A string representation makes it fragile to evolve.

As I mentioned before, the current yarn allocation model in spark is a very 
naive implementation - which I did not expect to survive this long : it was 
directly from our prototype.
We really should be modifying it to consider cost of data transfer and 
prioritize allocation that way (number of blocks on a node/rack, size of 
blocks, number of replicas available, etc).
For small datasets on small enough clusters this is not relevant but has 
implications as we grow along both axis.

 With YARN, preferredNodeLocalityData isn't honored 
 ---

 Key: SPARK-2089
 URL: https://issues.apache.org/jira/browse/SPARK-2089
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical

 When running in YARN cluster mode, apps can pass preferred locality data when 
 constructing a Spark context that will dictate where to request executor 
 containers.
 This is currently broken because of a race condition.  The Spark-YARN code 
 runs the user class and waits for it to start up a SparkContext.  During its 
 initialization, the SparkContext will create a YarnClusterScheduler, which 
 notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
 immediately fetches the preferredNodeLocationData from the SparkContext and 
 uses it to start requesting containers.
 But in the SparkContext constructor that takes the preferredNodeLocationData, 
 setting preferredNodeLocationData comes after the rest of the initialization, 
 so, if the Spark-YARN code comes around quickly enough after being notified, 
 the data that's fetched is the empty unset version.  The occurred during all 
 of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092807#comment-14092807
 ] 

Mridul Muralidharan commented on SPARK-2962:


On further investigation :

a) The primary issue is a combination of SPARK-2089 and current schedule 
behavior for pendingTasksWithNoPrefs.
SPARK-2089 leads to very bad allocation of nodes - particularly has an impact 
on bigger clusters.
It leads to a lot of block having no data or rack local executors - causing 
them to end up in pendingTasksWithNoPrefs.

While loading data off dfs, when an executor is being scheduled, even though 
there might be rack local schedules available for it (or, on waiting a while, 
data local too - see (b) below), because of current scheduler behavior, tasks 
from pendingTasksWithNoPrefs get scheduled : causing a large number of ANY 
tasks to be scheduled at the very onset.

The combination of these, with lack of marginal alleviation via (b) is what 
caused the performance impact.

b) spark.scheduler.minRegisteredExecutorsRatio was not yet been used in the 
workload - so that might alleviate some of the non deterministic waiting and 
ensuring adequate executors are allocated ! Thanks [~lirui]



 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2962:
--

 Summary: Suboptimal scheduling in spark
 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan



In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are 
always scheduled with PROCESS_LOCAL

pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
locations - but which could come in 'later' : particularly relevant when spark 
app is just coming up and containers are still being added.

This causes a large number of non node local tasks to be scheduled incurring 
significant network transfers in the cluster when running with non trivial 
datasets.

The comment // Look for no-pref tasks after rack-local tasks since they can 
run anywhere. is misleading in the method code : locality levels start from 
process_local down to any, and so no prefs get scheduled much before rack.


Also note that, currentLocalityIndex is reset to the taskLocality returned by 
this method - so returning PROCESS_LOCAL as the level will trigger wait times 
again. (Was relevant before recent change to scheduler, and might be again 
based on resolution of this issue).


Found as part of writing test for SPARK-2931
 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092427#comment-14092427
 ] 

Mridul Muralidharan commented on SPARK-2962:


To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-208 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]



 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092430#comment-14092430
 ] 

Mridul Muralidharan commented on SPARK-2962:


Hi [~matei],

  I am referencing the latest code (as of yday night).

pendingTasksWithNoPrefs currnetly contains both tasks which truely have no 
preference, and tasks which have preference which are unavailble - and the 
latter is what is triggering this, since that can change during the execution 
of the stage.
Hope I am not missing something ?

Thanks,
Mridul

 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092431#comment-14092431
 ] 

Mridul Muralidharan commented on SPARK-2962:


Note, I dont think this is a regression in 1.1, and probably existed much 
earlier too.
Other issues are making us notice this (like SPARK-2089) - we moved to 1.1 from 
0.9 recently.

 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092427#comment-14092427
 ] 

Mridul Muralidharan edited comment on SPARK-2962 at 8/11/14 4:35 AM:
-

To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-2089 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]




was (Author: mridulm80):
To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-208 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]



 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14091746#comment-14091746
 ] 

Mridul Muralidharan commented on SPARK-2931:


[~kayousterhout] this is weird, I remember mentioned this exact same issue in 
some PR for 1.1 (trying to find which one, though not 1313 iirc); and I think 
it was supposed to have been addressed.
We had observed this issue of currentLocalityLevel running away when we had 
internally merged the pr.

Strange that it was not addressed, speaks volumes of me not following up on my 
reviews !

 getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
 ---

 Key: SPARK-2931
 URL: https://issues.apache.org/jira/browse/SPARK-2931
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
 benchmark
Reporter: Josh Rosen
Priority: Blocker
 Fix For: 1.1.0


 When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
 I get the following errors (one per task):
 {code}
 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
 bytes)
 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
 executor: 
 Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
  with ID 0
 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
   at 
 org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 {code}
 This causes the job to hang.
 I can deterministically reproduce this by re-running the test, either in 
 isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-2931:
---

Attachment: test.patch

A patch to showcase the exception

 getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
 ---

 Key: SPARK-2931
 URL: https://issues.apache.org/jira/browse/SPARK-2931
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
 benchmark
Reporter: Josh Rosen
Priority: Blocker
 Fix For: 1.1.0

 Attachments: scala-sort-by-key.err, test.patch


 When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
 I get the following errors (one per task):
 {code}
 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
 bytes)
 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
 executor: 
 Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
  with ID 0
 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
   at 
 org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 {code}
 This causes the job to hang.
 I can deterministically reproduce this by re-running the test, either in 
 isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14091881#comment-14091881
 ] 

Mridul Muralidharan commented on SPARK-2931:


[~joshrosen] [~kayousterhout] Added a patch which deterministically showcases 
the bug - should be easy to fix it now I hope :-)

 getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
 ---

 Key: SPARK-2931
 URL: https://issues.apache.org/jira/browse/SPARK-2931
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
 benchmark
Reporter: Josh Rosen
Priority: Blocker
 Fix For: 1.1.0

 Attachments: scala-sort-by-key.err, test.patch


 When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
 I get the following errors (one per task):
 {code}
 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
 bytes)
 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
 executor: 
 Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
  with ID 0
 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
   at 
 org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 {code}
 This causes the job to hang.
 I can deterministically reproduce this by re-running the test, either in 
 isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp

2014-08-06 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14088018#comment-14088018
 ] 

Mridul Muralidharan commented on SPARK-2881:


To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under java.io.tmpdir as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am marking this as a blocker for 
release

 Snappy is now default codec - could lead to conflicts since uses /tmp
 -

 Key: SPARK-2881
 URL: https://issues.apache.org/jira/browse/SPARK-2881
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Thomas Graves
Priority: Blocker

 I was using spark master branch and I ran into an issue with Snappy since its 
 now the default codec for shuffle. 
 The issue was that someone else had run with snappy and it created 
 /tmp/snappy-*.so but it had restrictive permissions so I was not able to use 
 it or remove it.   This caused my spark job to not start.  
 I was running in yarn client mode at the time.  Yarn cluster mode shouldn't 
 have this issue since we change the java.io.tmpdir. 
 I assume this would also affect standalone mode.
 I'm not sure if this is a true blocker but wanted to file it as one at first 
 and let us decide.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp

2014-08-06 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14088018#comment-14088018
 ] 

Mridul Muralidharan edited comment on SPARK-2881 at 8/6/14 6:45 PM:


To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under java.io.tmpdir as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am +1 on marking this as a blocker for 
release


was (Author: mridulm80):
To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under java.io.tmpdir as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am marking this as a blocker for 
release

 Snappy is now default codec - could lead to conflicts since uses /tmp
 -

 Key: SPARK-2881
 URL: https://issues.apache.org/jira/browse/SPARK-2881
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Thomas Graves
Priority: Blocker

 I was using spark master branch and I ran into an issue with Snappy since its 
 now the default codec for shuffle. 
 The issue was that someone else had run with snappy and it created 
 /tmp/snappy-*.so but it had restrictive permissions so I was not able to use 
 it or remove it.   This caused my spark job to not start.  
 I was running in yarn client mode at the time.  Yarn cluster mode shouldn't 
 have this issue since we change the java.io.tmpdir. 
 I assume this would also affect standalone mode.
 I'm not sure if this is a true blocker but wanted to file it as one at first 
 and let us decide.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2685) Update ExternalAppendOnlyMap to avoid buffer.remove()

2014-07-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074186#comment-14074186
 ] 

Mridul Muralidharan commented on SPARK-2685:


We moved to using java.util.LinkedList for this

 Update ExternalAppendOnlyMap to avoid buffer.remove()
 -

 Key: SPARK-2685
 URL: https://issues.apache.org/jira/browse/SPARK-2685
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Reporter: Matei Zaharia

 This shifts the whole right side of the array back, which can be expensive. 
 It would be better to just swap the last element into the position we want to 
 remove at, then decrease the size of the array.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2532) Fix issues with consolidated shuffle

2014-07-16 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2532:
--

 Summary: Fix issues with consolidated shuffle
 Key: SPARK-2532
 URL: https://issues.apache.org/jira/browse/SPARK-2532
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
Priority: Critical
 Fix For: 1.1.0



Will file PR with changes as soon as merge is done (earlier merge became 
outdated in 2 weeks unfortunately :) ).

Consolidated shuffle is broken in multiple ways in spark :

a) Task failure(s) can cause the state to become inconsistent.

b) Multiple revert's or combination of close/revert/close can cause the state 
to be inconsistent.
(As part of exception/error handling).

c) Some of the api in block writer causes implementation issues - for example: 
a revert is always followed by close : but the implemention tries to keep them 
separate, resulting in surface for errors.

d) Fetching data from consolidated shuffle files can go badly wrong if the file 
is being actively written to : it computes length by subtracting next offset 
from current offset (or length if this is last offset)- the latter fails when 
fetch is happening in parallel to write.
Note, this happens even if there are no task failures of any kind !
This usually results in stream corruption or decompression errors.




--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060543#comment-14060543
 ] 

Mridul Muralidharan commented on SPARK-2468:


We map the file content and directly write that to the socket (except when the 
size is below 8k or so iirc) - are you sure we are copying to user space and 
back ?

 zero-copy shuffle network communication
 ---

 Key: SPARK-2468
 URL: https://issues.apache.org/jira/browse/SPARK-2468
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
Priority: Critical

 Right now shuffle send goes through the block manager. This is inefficient 
 because it requires loading a block from disk into a kernel buffer, then into 
 a user space buffer, and then back to a kernel send buffer before it reaches 
 the NIC. It does multiple copies of the data and context switching between 
 kernel/user. It also creates unnecessary buffer in the JVM that increases GC
 Instead, we should use FileChannel.transferTo, which handles this in the 
 kernel space with zero-copy. See 
 http://www.ibm.com/developerworks/library/j-zerocopy/
 One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060545#comment-14060545
 ] 

Mridul Muralidharan commented on SPARK-2468:


Writing mmap'ed buffers are pretty efficient btw - the second fallback in 
transferTo implementation iirc.

 zero-copy shuffle network communication
 ---

 Key: SPARK-2468
 URL: https://issues.apache.org/jira/browse/SPARK-2468
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
Priority: Critical

 Right now shuffle send goes through the block manager. This is inefficient 
 because it requires loading a block from disk into a kernel buffer, then into 
 a user space buffer, and then back to a kernel send buffer before it reaches 
 the NIC. It does multiple copies of the data and context switching between 
 kernel/user. It also creates unnecessary buffer in the JVM that increases GC
 Instead, we should use FileChannel.transferTo, which handles this in the 
 kernel space with zero-copy. See 
 http://www.ibm.com/developerworks/library/j-zerocopy/
 One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061094#comment-14061094
 ] 

Mridul Muralidharan commented on SPARK-2468:



Ah, small files - those are indeed a problem.

Btw, we do dispose off map'ed blocks as soon as it is done; so we dont need to 
wait for gc to free them. Also note that the files are closed as soon as opened 
and mmap'ed - so they do not count towards open file count/ulimit.

Agree on 1, 3 and 4 - some of these apply to sendfile too btw : so not 
avoidable; but it is the best we have right now.
Since we use mmap'ed buffers and rarely transfer the same file again, the 
performance jump might not be the order(s) of magnitude other projects claim - 
but then even 10% (or whatever) improvement in our case would be substantial !

 zero-copy shuffle network communication
 ---

 Key: SPARK-2468
 URL: https://issues.apache.org/jira/browse/SPARK-2468
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
Priority: Critical

 Right now shuffle send goes through the block manager. This is inefficient 
 because it requires loading a block from disk into a kernel buffer, then into 
 a user space buffer, and then back to a kernel send buffer before it reaches 
 the NIC. It does multiple copies of the data and context switching between 
 kernel/user. It also creates unnecessary buffer in the JVM that increases GC
 Instead, we should use FileChannel.transferTo, which handles this in the 
 kernel space with zero-copy. See 
 http://www.ibm.com/developerworks/library/j-zerocopy/
 One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2398) Trouble running Spark 1.0 on Yarn

2014-07-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060113#comment-14060113
 ] 

Mridul Muralidharan commented on SPARK-2398:



As discussed in the PR, I am attempting to list the various factors which 
contribute to overhead.
Note, this is not exhaustive (yet) - please add more to this JIRA - so that 
when we are reasonably sure, we can model the expected overhead based on these 
factors.

These factors are typically off-heap - since anything within heap is budgetted 
for by Xmx - and enforced by VM : and so should ideally (not practically 
always, see gc overheads) not exceed the Xmx value

1) 256 KB per socket accepted via ConnectionManager for inter-worker comm 
(setReceiveBufferSize)
Typically, there will be (numExecutor - 1) number of sockets open.

2) 128 KB per socket for writing output to dfs. For reads, this does not seem 
to be configured - and should be 8k per socket iirc.
Typically 1 per executor at a given point in time ?

3) 256k for each akka socket for send/receive buffer.
One per worker ? (to talk to master) - so 512kb ? Any other use of akka ?

4) If I am not wrong, netty might allocate multiple spark.akka.frameSize 
sized direct buffer. There might be a few of these allocated and pooled/reused.
I did not go in detail into netty code though. If someone else with more 
knowhow can clarify, that would be great !
Default size of 10mb for spark.akka.frameSize

5) The default size of the assembled spark jar is about 12x mb (and changing) - 
though not all classes get loaded, the overhead would be some function of this.
The actual footprint would be higher than the on-disk size.
IIRC this is outside of the heap - [~sowen], any comments on this ? I have not 
looked into these in like 10 years now !

6) Per thread (Xss) overhead of 1mb (for 64bit vm).
Last I recall, we have about 220 odd threads - not sure if this was at the 
master or on the workers.
Ofcourse, this is dependent on the various threadpools we use (io, computation, 
etc), akka and netty config, etc.

7) Disk read overhead.
Thanks for [~pwendell]'s fix, atleast for small files, the overhead is not too 
high - since we do not mmap files but directly read them.
But for anything larger than 8kb (default), we use memory mapped buffers.
The actual overhead depends on the number of files opened for read via 
DiskStore - and the entire file contents get mmap'ed into virt mem.
Note that there is some non-virt-mem overhead also at native level for these 
buffers.

The actual number of files opened should be carefully tracked to understand the 
effect of this on spark overhead : since this aspect is changing a lot off late.
Impact is on shuffle,  disk persisted rdd, among others.
The actual value would be application dependent (how large the data is !)


8) The overhead introduced by VM not being able to reclaim memory completely 
(the cost of moving data vs amount of space reclaimed).
Ideally, this should be low - but would be dependent on the heap space, 
collector used, among other things.
I am not very knowledgable of the recent advances in gc collectors, so I 
hesitate to put a number to this.



I am sure this is not an exhaustive list, please do add to this.
In our case specifically, and [~tgraves] could add more, the number of 
containers can be high (300+ is easily possible), memory per container is 
modest (8gig usually).
To add details of observed overhead patterns (from the PR discussion) - 
a) I have had inhouse GBDT impl run without customizing overhead (so default of 
384 mb) with 12gb container and 22 nodes on reasonably large dataset.
b) I have had to customize overhead to 1.7gb for collaborative filtering with 
8gb container and 300 nodes (on a fairly large dataset).
c) I have had to minimally customize overhead to do inhouse QR factorization of 
a 50k x 50k distributed dense matrix on 45 nodes at 12 gb each (this was 
incorrectly specified in the PR discussion).

 Trouble running Spark 1.0 on Yarn 
 --

 Key: SPARK-2398
 URL: https://issues.apache.org/jira/browse/SPARK-2398
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Nishkam Ravi

 Trouble running workloads in Spark-on-YARN cluster mode for Spark 1.0. 
 For example: SparkPageRank when run in standalone mode goes through without 
 any errors (tested for up to 30GB input dataset on a 6-node cluster).  Also 
 runs fine for a 1GB dataset in yarn cluster mode. Starts to choke (in yarn 
 cluster mode) as the input data size is increased. Confirmed for 16GB input 
 dataset.
 The same workload runs fine with Spark 0.9 in both standalone and yarn 
 cluster mode (for up to 30 GB input dataset on a 6-node cluster).
 Commandline used:
 (/opt/cloudera/parcels/CDH/lib/spark/bin/spark-submit 

[jira] [Commented] (SPARK-2390) Files in staging directory cannot be deleted and wastes the space of HDFS

2014-07-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14054162#comment-14054162
 ] 

Mridul Muralidharan commented on SPARK-2390:


Here, and a bunch of other places, spark currently closes the Filesystem 
instance : this is incorrect, and should not be done.
The fix would be to remove the fs.close; not force creation of new instances.

 Files in staging directory cannot be deleted and wastes the space of HDFS
 -

 Key: SPARK-2390
 URL: https://issues.apache.org/jira/browse/SPARK-2390
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0, 1.0.1
Reporter: Kousuke Saruta

 When running jobs with YARN Cluster mode and using HistoryServer, the files 
 in the Staging Directory cannot be deleted.
 HistoryServer uses directory where event log is written, and the directory is 
 represented as a instance of o.a.h.f.FileSystem created by using 
 FileSystem.get.
 {code:title=FileLogger.scala}
 private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
 {code}
 {code:title=utils.getHadoopFileSystem}
 def getHadoopFileSystem(path: URI): FileSystem = {
   FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
 }
 {code}
 On the other hand, ApplicationMaster has a instance named fs, which also 
 created by using FileSystem.get.
 {code:title=ApplicationMaster}
 private val fs = FileSystem.get(yarnConf)
 {code}
 FileSystem.get returns cached same instance when URI passed to the method 
 represents same file system and the method is called by same user.
 Because of the behavior, when the directory for event log is on HDFS, fs of 
 ApplicationMaster and fileSystem of FileLogger is same instance.
 When shutting down ApplicationMaster, fileSystem.close is called in 
 FileLogger#stop, which is invoked by SparkContext#stop indirectly.
 {code:title=FileLogger.stop}
 def stop() {
   hadoopDataStream.foreach(_.close())
   writer.foreach(_.close())
   fileSystem.close()
 }
 {code}
 And  ApplicationMaster#cleanupStagingDir also called by JVM shutdown hook. In 
 this method, fs.delete(stagingDirPath) is invoked. 
 Because fs.delete in ApplicationMaster is called after fileSystem.close in 
 FileLogger, fs.delete fails and results not deleting files in the staging 
 directory.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052275#comment-14052275
 ] 

Mridul Muralidharan commented on SPARK-2277:


Hmm, good point - that PR does change the scheduler expectations in a lot of 
ways which were not all anticipated.
Let me go through the current PR; thanks for the bug !

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052289#comment-14052289
 ] 

Mridul Muralidharan commented on SPARK-2017:


With aggregated metrics, we loose the ability to check for gc time (which is 
actually what I use that UI for, other than to dig up exceptions on failed 
tasks).

 web ui stage page becomes unresponsive when the number of tasks is large
 

 Key: SPARK-2017
 URL: https://issues.apache.org/jira/browse/SPARK-2017
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin
  Labels: starter

 {code}
 sc.parallelize(1 to 100, 100).count()
 {code}
 The above code creates one million tasks to be executed. The stage detail web 
 ui page takes forever to load (if it ever completes).
 There are again a few different alternatives:
 0. Limit the number of tasks we show.
 1. Pagination
 2. By default only show the aggregate metrics and failed tasks, and hide the 
 successful ones.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052679#comment-14052679
 ] 

Mridul Muralidharan commented on SPARK-2017:


Sounds great, ability to get to currently running tasks (to check current 
state), ability to get to task failures (to debug usually), some aggregate 
stats (gc, stats per executor, etc) and having some way to get to the full 
details (which is what is seen currently) in an advanced or full view.
Anything else would be a bonus :-)

 web ui stage page becomes unresponsive when the number of tasks is large
 

 Key: SPARK-2017
 URL: https://issues.apache.org/jira/browse/SPARK-2017
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin
  Labels: starter

 {code}
 sc.parallelize(1 to 100, 100).count()
 {code}
 The above code creates one million tasks to be executed. The stage detail web 
 ui page takes forever to load (if it ever completes).
 There are again a few different alternatives:
 0. Limit the number of tasks we show.
 1. Pagination
 2. By default only show the aggregate metrics and failed tasks, and hide the 
 successful ones.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler

2014-07-03 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2353:
--

 Summary: ArrayIndexOutOfBoundsException in scheduler
 Key: SPARK-2353
 URL: https://issues.apache.org/jira/browse/SPARK-2353
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Mridul Muralidharan
Priority: Blocker



I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is highly suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-03 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051575#comment-14051575
 ] 

Mridul Muralidharan commented on SPARK-2277:


I have not rechecked that the code, but the way it was originally written by me 
was :

a) Task preference is decoupled from availability of the node.
For example, we need not have an executor on a host for which a block has host 
preference (example dfs blocks on a shared cluster)
Also note that a block might have one or more preferred location.

b) We lookup the rack for the preferred location to get preferred rack.
As with (a), there need not be an executor on that rack. This is just the rack 
preference.


c) At schedule time, for an executor, we lookup the host/rack of the executors 
location - and decide appropriately based on that.



In this context, I think your requirement is already handled.
Even if we dont have any hosts alive on a rack, those tasks would still be 
mentioned with rack local preference in task set manager.
When an executor comes in (existing or new), we check that executors rack with 
task preference - and it would now be marked rack local.

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler

2014-07-03 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-2353:
---

Description: 
I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Unfortunately, we do not have the bandwidth to tackle this issue - would be 
great if someone could take a look at it ! Thanks.

  was:

I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is highly suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241)
at 

[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14050886#comment-14050886
 ] 

Mridul Muralidharan commented on SPARK-2277:


I am not sure I follow this requirement.
For preferred locations, we populate their corresponding racks (if available) 
as preferred rack.

For available executors hosts, we lookup the rack they belong to - and then see 
if that rack is preferred or not.

This, ofcourse, assumes a host is only on a single rack.


What exactly is the behavior you are expecting from scheduler ?

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2294) TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks get assigned to an executor

2014-06-26 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045433#comment-14045433
 ] 

Mridul Muralidharan commented on SPARK-2294:


I agree; We should bump no locality pref and speculative tasks to NODE_LOCAL 
level after NODE_LOCAL tasks have been scheduled (if available), and not check 
for them at PROCESS_LOCAL max locality. So they get scheduled before RACK_LOCAL 
but after NODE_LOCAL.
This is an artifact of the design when there was no PROCESS_LOCAL and 
NODE_LOCAL was the best schedule possible (without explicitly having these 
level : we had node and any).

 TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks 
 get assigned to an executor
 ---

 Key: SPARK-2294
 URL: https://issues.apache.org/jira/browse/SPARK-2294
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0, 1.0.1
Reporter: Kay Ousterhout

 If an executor E is free, a task may be speculatively assigned to E when 
 there are other tasks in the job that have not been launched (at all) yet.  
 Similarly, a task without any locality preferences may be assigned to E when 
 there was another NODE_LOCAL task that could have been scheduled. 
 This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer 
 (which in turn calls TaskSetManager.findTask) with increasing locality 
 levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until 
 the highest currently allowed level.  Now, supposed NODE_LOCAL is the highest 
 currently allowed locality level.  The first time findTask is called, it will 
 be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL 
 tasks, it will try to schedule tasks with no locality preferences or 
 speculative tasks.  As a result, speculative tasks or tasks with no 
 preferences may be scheduled instead of NODE_LOCAL tasks.
 cc [~matei]



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown

2014-06-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14043088#comment-14043088
 ] 

Mridul Muralidharan commented on SPARK-2268:


That is not because of this hook.
There are a bunch of places in spark where filesystem objects are (incorrectly 
I should add) getting closed : some within shutdown hooks (check in stop method 
in various services in spark) and others elsewhere (like checkpointing code).

I have fixed a bunch of these as part of some other work ... should come in a 
PR soon.

 Utils.createTempDir() creates race with HDFS at shutdown
 

 Key: SPARK-2268
 URL: https://issues.apache.org/jira/browse/SPARK-2268
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Marcelo Vanzin

 Utils.createTempDir() has this code:
 {code}
 // Add a shutdown hook to delete the temp dir when the JVM exits
 Runtime.getRuntime.addShutdownHook(new Thread(delete Spark temp dir  + 
 dir) {
   override def run() {
 // Attempt to delete if some patch which is parent of this is not 
 already registered.
 if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
   }
 })
 {code}
 This creates a race with the shutdown hooks registered by HDFS, since the 
 order of execution is undefined; if the HDFS hooks run first, you'll get 
 exceptions about the file system being closed.
 Instead, this should use Hadoop's ShutdownHookManager with a proper priority, 
 so that it runs before the HDFS hooks.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown

2014-06-24 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14043071#comment-14043071
 ] 

Mridul Muralidharan commented on SPARK-2268:


Setting priority for shutdown hooks does not have too much impact given the 
state of the VM.
Note that this hook is trying to delete local directories - not dfs directories.

 Utils.createTempDir() creates race with HDFS at shutdown
 

 Key: SPARK-2268
 URL: https://issues.apache.org/jira/browse/SPARK-2268
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Marcelo Vanzin

 Utils.createTempDir() has this code:
 {code}
 // Add a shutdown hook to delete the temp dir when the JVM exits
 Runtime.getRuntime.addShutdownHook(new Thread(delete Spark temp dir  + 
 dir) {
   override def run() {
 // Attempt to delete if some patch which is parent of this is not 
 already registered.
 if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
   }
 })
 {code}
 This creates a race with the shutdown hooks registered by HDFS, since the 
 order of execution is undefined; if the HDFS hooks run first, you'll get 
 exceptions about the file system being closed.
 Instead, this should use Hadoop's ShutdownHookManager with a proper priority, 
 so that it runs before the HDFS hooks.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-704) ConnectionManager sometimes cannot detect loss of sending connections

2014-06-21 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039742#comment-14039742
 ] 

Mridul Muralidharan commented on SPARK-704:
---

If remote node goes down, SendingConnection would be notified since it is also 
registered for read events (to handle precisely this case actually).
ReceivingConnection would anyway be notified since it is waiting on reads on 
that socket.

This, ofcourse, assumes that local node detects remote node failure at tcp 
layer.
Problems come in when 

 ConnectionManager sometimes cannot detect loss of sending connections
 -

 Key: SPARK-704
 URL: https://issues.apache.org/jira/browse/SPARK-704
 Project: Spark
  Issue Type: Bug
Reporter: Charles Reiss
Assignee: Henry Saputra

 ConnectionManager currently does not detect when SendingConnections 
 disconnect except if it is trying to send through them. As a result, a node 
 failure just after a connection is initiated but before any acknowledgement 
 messages can be sent may result in a hang.
 ConnectionManager has code intended to detect this case by detecting the 
 failure of a corresponding ReceivingConnection, but this code assumes that 
 the remote host:port of the ReceivingConnection is the same as the 
 ConnectionManagerId, which is almost never true. Additionally, there does not 
 appear to be any reason to assume a corresponding ReceivingConnection will 
 exist.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (SPARK-704) ConnectionManager sometimes cannot detect loss of sending connections

2014-06-21 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039742#comment-14039742
 ] 

Mridul Muralidharan edited comment on SPARK-704 at 6/21/14 9:10 AM:


If remote node goes down, SendingConnection would be notified since it is also 
registered for read events (to handle precisely this case actually).
ReceivingConnection would be notified since it is waiting on reads on that 
socket.

This, ofcourse, assumes that local node detects remote node failure at tcp 
layer.
Problems come in when this is not detected due to no activity on the socket (at 
app and socket level - keepalive timeout, etc).
Usually this is detected via application level ping/keepalive messages :  not 
sure if we want to introduce that into spark ...


was (Author: mridulm80):
If remote node goes down, SendingConnection would be notified since it is also 
registered for read events (to handle precisely this case actually).
ReceivingConnection would anyway be notified since it is waiting on reads on 
that socket.

This, ofcourse, assumes that local node detects remote node failure at tcp 
layer.
Problems come in when 

 ConnectionManager sometimes cannot detect loss of sending connections
 -

 Key: SPARK-704
 URL: https://issues.apache.org/jira/browse/SPARK-704
 Project: Spark
  Issue Type: Bug
Reporter: Charles Reiss
Assignee: Henry Saputra

 ConnectionManager currently does not detect when SendingConnections 
 disconnect except if it is trying to send through them. As a result, a node 
 failure just after a connection is initiated but before any acknowledgement 
 messages can be sent may result in a hang.
 ConnectionManager has code intended to detect this case by detecting the 
 failure of a corresponding ReceivingConnection, but this code assumes that 
 the remote host:port of the ReceivingConnection is the same as the 
 ConnectionManagerId, which is almost never true. Additionally, there does not 
 appear to be any reason to assume a corresponding ReceivingConnection will 
 exist.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2223) Building and running tests with maven is extremely slow

2014-06-20 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039217#comment-14039217
 ] 

Mridul Muralidharan commented on SPARK-2223:


[~tgraves] You could try running zinc - speeds up the maven build quite a bit.
I find that I need to shutdown and restart it at times though .. but otherwise, 
works fine.

 Building and running tests with maven is extremely slow
 ---

 Key: SPARK-2223
 URL: https://issues.apache.org/jira/browse/SPARK-2223
 Project: Spark
  Issue Type: Bug
  Components: Build
Affects Versions: 1.0.0
Reporter: Thomas Graves

 For some reason using maven with Spark is extremely slow.  Building and 
 running tests takes way longer then other projects I have used that use 
 maven.  We should investigate to see why.  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-06-20 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039236#comment-14039236
 ] 

Mridul Muralidharan commented on SPARK-2089:


[~pwendell] SplitInfo is not from hadoop - but gives locality preference in 
context spark (see org.apache.spark.scheduler.SplitInfo) in a reasonably api 
agnostic way.
The default support provided for it is hadoop specific based on dfs blocks - 
but I dont think there is anything stopping us from expressing other forms 
(either already currently or with minor modifications as applicable).

We actually very heavily use that api - moving 10s or 100s of TB of data tends 
to be fairly expensive :-)
Since we are still stuck in 0.9 + changes, have not yet faced this issue 
though, so great to see this being addressed.



 With YARN, preferredNodeLocalityData isn't honored 
 ---

 Key: SPARK-2089
 URL: https://issues.apache.org/jira/browse/SPARK-2089
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical

 When running in YARN cluster mode, apps can pass preferred locality data when 
 constructing a Spark context that will dictate where to request executor 
 containers.
 This is currently broken because of a race condition.  The Spark-YARN code 
 runs the user class and waits for it to start up a SparkContext.  During its 
 initialization, the SparkContext will create a YarnClusterScheduler, which 
 notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
 immediately fetches the preferredNodeLocationData from the SparkContext and 
 uses it to start requesting containers.
 But in the SparkContext constructor that takes the preferredNodeLocationData, 
 setting preferredNodeLocationData comes after the rest of the initialization, 
 so, if the Spark-YARN code comes around quickly enough after being notified, 
 the data that's fetched is the empty unset version.  The occurred during all 
 of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1353) IllegalArgumentException when writing to disk

2014-06-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14033625#comment-14033625
 ] 

Mridul Muralidharan commented on SPARK-1353:


This is due to limitation in spark which is being addressed in 
https://issues.apache.org/jira/browse/SPARK-1476.

 IllegalArgumentException when writing to disk
 -

 Key: SPARK-1353
 URL: https://issues.apache.org/jira/browse/SPARK-1353
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
 Environment: AWS EMR 3.2.30-49.59.amzn1.x86_64 #1 SMP  x86_64 
 GNU/Linux
 Spark 1.0.0-SNAPSHOT built for Hadoop 1.0.4 built 2014-03-18
Reporter: Jim Blomo
Priority: Minor

 The Executor may fail when trying to mmap a file bigger than 
 Integer.MAX_VALUE due to the constraints of FileChannel.map 
 (http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#map(java.nio.channels.FileChannel.MapMode,
  long, long)).  The signature takes longs, but the size value must be less 
 than MAX_VALUE.  This manifests with the following backtrace:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98)
 at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:337)
 at 
 org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:281)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:430)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:38)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
 at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2018) Big-Endian (IBM Power7) Spark Serialization issue

2014-06-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027808#comment-14027808
 ] 

Mridul Muralidharan commented on SPARK-2018:


Ah ! This is an interesting bug.
Default spark uses java serialization ... so should not be an issue : but yet 
you are facing it ! (I am assuming you have not customized serialization).
Is it possible for you to dump data written and read at both ends ? The env 
vars and jvm details ?
Actually, spark does not do anything fancy for default serialization : so a 
simple example code without spark in picture could also be tried (write to file 
on master node, and read from the file in slave node - and see if it works)

 Big-Endian (IBM Power7)  Spark Serialization issue
 --

 Key: SPARK-2018
 URL: https://issues.apache.org/jira/browse/SPARK-2018
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.0.0
 Environment: hardware : IBM Power7
 OS:Linux version 2.6.32-358.el6.ppc64 
 (mockbu...@ppc-017.build.eng.bos.redhat.com) (gcc version 4.4.7 20120313 (Red 
 Hat 4.4.7-3) (GCC) ) #1 SMP Tue Jan 29 11:43:27 EST 2013
 JDK: Java(TM) SE Runtime Environment (build pxp6470sr5-20130619_01(SR5))
 IBM J9 VM (build 2.6, JRE 1.7.0 Linux ppc64-64 Compressed References 
 20130617_152572 (JIT enabled, AOT enabled)
 Hadoop:Hadoop-0.2.3-CDH5.0
 Spark:Spark-1.0.0 or Spark-0.9.1
 spark-env.sh:
 export JAVA_HOME=/opt/ibm/java-ppc64-70/
 export SPARK_MASTER_IP=9.114.34.69
 export SPARK_WORKER_MEMORY=1m
 export SPARK_CLASSPATH=/home/test1/spark-1.0.0-bin-hadoop2/lib
 export  STANDALONE_SPARK_MASTER_HOST=9.114.34.69
 #export SPARK_JAVA_OPTS=' -Xdebug 
 -Xrunjdwp:transport=dt_socket,address=9,server=y,suspend=n '
Reporter: Yanjie Gao

 We have an application run on Spark on Power7 System .
 But we meet an important issue about serialization.
 The example HdfsWordCount can meet the problem.
 ./bin/run-example  org.apache.spark.examples.streaming.HdfsWordCount 
 localdir
 We used Power7 (Big-Endian arch) and Redhat  6.4.
 Big-Endian  is the main cause since the example ran successfully in another 
 Power-based Little Endian setup.
 here is the exception stack and log:
 Spark Executor Command: /opt/ibm/java-ppc64-70//bin/java -cp 
 /home/test1/spark-1.0.0-bin-hadoop2/lib::/home/test1/src/spark-1.0.0-bin-hadoop2/conf:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/
  -XX:MaxPermSize=128m  -Xdebug 
 -Xrunjdwp:transport=dt_socket,address=9,server=y,suspend=n -Xms512M 
 -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend 
 akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler 2 
 p7hvs7br16 4 akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker 
 app-20140604023054-
 
 14/06/04 02:31:20 WARN util.NativeCodeLoader: Unable to load native-hadoop 
 library for your platform... using builtin-java classes where applicable
 14/06/04 02:31:21 INFO spark.SecurityManager: Changing view acls to: 
 test1,yifeng
 14/06/04 02:31:21 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(test1, yifeng)
 14/06/04 02:31:22 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/06/04 02:31:22 INFO Remoting: Starting remoting
 14/06/04 02:31:22 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkExecutor@p7hvs7br16:39658]
 14/06/04 02:31:22 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://sparkExecutor@p7hvs7br16:39658]
 14/06/04 02:31:22 INFO executor.CoarseGrainedExecutorBackend: Connecting to 
 driver: akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler
 14/06/04 02:31:22 INFO worker.WorkerWatcher: Connecting to worker 
 akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker
 14/06/04 02:31:23 INFO worker.WorkerWatcher: Successfully connected to 
 akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker
 14/06/04 02:31:24 INFO executor.CoarseGrainedExecutorBackend: Successfully 
 registered with driver
 14/06/04 02:31:24 INFO spark.SecurityManager: Changing view acls to: 
 test1,yifeng
 14/06/04 02:31:24 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(test1, yifeng)
 14/06/04 02:31:24 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/06/04 02:31:24 INFO Remoting: Starting remoting
 14/06/04 02:31:24 INFO Remoting: Remoting started; listening on addresses 
 

[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-06-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14026397#comment-14026397
 ] 

Mridul Muralidharan commented on SPARK-2089:


preferredNodeLocationData used to be passed as a constructor parameter - and so 
always available.
The rearrangement of SparkContext initialization has introduced this bug.


 With YARN, preferredNodeLocalityData isn't honored 
 ---

 Key: SPARK-2089
 URL: https://issues.apache.org/jira/browse/SPARK-2089
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Sandy Ryza

 When running in YARN cluster mode, apps can pass preferred locality data when 
 constructing a Spark context that will dictate where to request executor 
 containers.
 This is currently broken because of a race condition.  The Spark-YARN code 
 runs the user class and waits for it to start up a SparkContext.  During its 
 initialization, the SparkContext will create a YarnClusterScheduler, which 
 notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
 immediately fetches the preferredNodeLocationData from the SparkContext and 
 uses it to start requesting containers.
 But in the SparkContext constructor that takes the preferredNodeLocationData, 
 setting preferredNodeLocationData comes after the rest of the initialization, 
 so, if the Spark-YARN code comes around quickly enough after being notified, 
 the data that's fetched is the empty unset version.  The occurred during all 
 of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead

2014-06-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020789#comment-14020789
 ] 

Mridul Muralidharan commented on SPARK-2064:


Depending on how long a job runs, this can cause OOM on the master.
In yarn (and mesos ?) an executor on the same node gets different port if 
relaunched on failure - and so end up as different executor in the list.

 web ui should not remove executors if they are dead
 ---

 Key: SPARK-2064
 URL: https://issues.apache.org/jira/browse/SPARK-2064
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin

 We should always show the list of executors that have ever been connected, 
 and add a status column to mark them as dead if they have been disconnected.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead

2014-06-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020936#comment-14020936
 ] 

Mridul Muralidharan commented on SPARK-2064:


It is 100 MB (or more) of memory which could be used elsewhere.
In our clusters, for example, the number of workers can be very high while the 
containers can be quite ephemeral when under load (and so lot of container 
losses); on other hand, memory per container is constrained to about 8 gig 
(lower when we account for overheads, etc).

So the amount of working memory in master reduces : we are finding that UI and 
related codepath is one of the portions which seems to be occupying a lot of 
memory in the OOM dumps of master.

 web ui should not remove executors if they are dead
 ---

 Key: SPARK-2064
 URL: https://issues.apache.org/jira/browse/SPARK-2064
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin

 We should always show the list of executors that have ever been connected, 
 and add a status column to mark them as dead if they have been disconnected.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead

2014-06-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14021008#comment-14021008
 ] 

Mridul Muralidharan commented on SPARK-2064:


Unfortunately OOM is a very big issue for us since application master is single 
point of failure when running in yarn.
Particularly when memory is constrained and vigorously enforced by the yarn 
containers (requiring higher overheads to be specified reducing usable memory 
even further.

Given this, and given the fair churn already for executor containers, I am 
hesitant about features which add to the memory footprint for UI even further. 
The cumulative impact of ui is nontrivial as I mentioned before. This, for 
example, would require 1-8% of master memory when there is reasonable churn for 
long running jobs (30 hours) on reasonable number of executors (200-300).


 web ui should not remove executors if they are dead
 ---

 Key: SPARK-2064
 URL: https://issues.apache.org/jira/browse/SPARK-2064
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin

 We should always show the list of executors that have ever been connected, 
 and add a status column to mark them as dead if they have been disconnected.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead

2014-06-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14021011#comment-14021011
 ] 

Mridul Muralidharan commented on SPARK-2064:


I am probably missing the intent behind this change.
What is the expected use case it is supposed to help with ?

 web ui should not remove executors if they are dead
 ---

 Key: SPARK-2064
 URL: https://issues.apache.org/jira/browse/SPARK-2064
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin

 We should always show the list of executors that have ever been connected, 
 and add a status column to mark them as dead if they have been disconnected.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large

2014-06-05 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14019394#comment-14019394
 ] 

Mridul Muralidharan commented on SPARK-2017:


Currently, for our jobs, I run with spark.ui.retainedStages=3 (so that there is 
some visibility into past stages) : this is to prevent OOM's in the master when 
number of tasks per stage is not low (50k for example is not very high imo)

The stage details UI becomes very sluggish to pretty much unresponsive for our 
tasks where tasks  30k ... though that might also be a browser issue 
(firefox/chrome) ?

 web ui stage page becomes unresponsive when the number of tasks is large
 

 Key: SPARK-2017
 URL: https://issues.apache.org/jira/browse/SPARK-2017
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin
  Labels: starter

 {code}
 sc.parallelize(1 to 100, 100).count()
 {code}
 The above code creates one million tasks to be executed. The stage detail web 
 ui page takes forever to load (if it ever completes).
 There are again a few different alternatives:
 0. Limit the number of tasks we show.
 1. Pagination
 2. By default only show the aggregate metrics and failed tasks, and hide the 
 successful ones.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1956) Enable shuffle consolidation by default

2014-05-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011741#comment-14011741
 ] 

Mridul Muralidharan commented on SPARK-1956:


shuffle consolidation MUST NOT be enabled - whether by default, or 
intentionally.
In 1.0, it is very badly broken - we have a whole litany of fixes for it, 
before it was reasonably stable.

Current plan is to contribute most of these back in 1.1 timeframe.

 Enable shuffle consolidation by default
 ---

 Key: SPARK-1956
 URL: https://issues.apache.org/jira/browse/SPARK-1956
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.0.0
Reporter: Sandy Ryza

 The only drawbacks are on ext3, and most everyone has ext4 at this point.  I 
 think it's better to aim the default at the common case.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1855) Provide memory-and-local-disk RDD checkpointing

2014-05-18 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14001377#comment-14001377
 ] 

Mridul Muralidharan commented on SPARK-1855:


Did not realize that mail replies to JIRA mails did not get mirrored to JIRA ! 
Replicating my mail here :

– cut and paste –

We don't have 3x replication in spark :-)
And if we use replicated storagelevel, while decreasing odds of failure, it 
does not eliminate it (since we are not doing a great job with replication 
anyway from fault tolerance point of view).
Also it does take a nontrivial performance hit with replicated levels.

Regards,
Mridul

 Provide memory-and-local-disk RDD checkpointing
 ---

 Key: SPARK-1855
 URL: https://issues.apache.org/jira/browse/SPARK-1855
 Project: Spark
  Issue Type: New Feature
  Components: MLlib, Spark Core
Affects Versions: 1.0.0
Reporter: Xiangrui Meng

 Checkpointing is used to cut long lineage while maintaining fault tolerance. 
 The current implementation is HDFS-based. Using the BlockRDD we can create 
 in-memory-and-local-disk (with replication) checkpoints that are not as 
 reliable as HDFS-based solution but faster.
 It can help applications that require many iterations.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1849) Broken UTF-8 encoded data gets character replacements and thus can't be fixed

2014-05-16 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14000397#comment-14000397
 ] 

Mridul Muralidharan commented on SPARK-1849:


Looks like textFile is probably the wrong api to use.
You cannot recover from badly encoded data ... Better would be to write your 
own InputFormat which does what you need.

 Broken UTF-8 encoded data gets character replacements and thus can't be 
 fixed
 ---

 Key: SPARK-1849
 URL: https://issues.apache.org/jira/browse/SPARK-1849
 Project: Spark
  Issue Type: Bug
Reporter: Harry Brundage
 Fix For: 1.0.0, 0.9.1

 Attachments: encoding_test


 I'm trying to process a file which isn't valid UTF-8 data inside hadoop using 
 Spark via {{sc.textFile()}}. Is this possible, and if not, is this a bug that 
 we should fix? It looks like {{HadoopRDD}} uses 
 {{org.apache.hadoop.io.Text.toString}} on all the data it ever reads, which I 
 believe replaces invalid UTF-8 byte sequences with the UTF-8 replacement 
 character, \uFFFD. Some example code mimicking what {{sc.textFile}} does 
 underneath:
 {code}
 scala sc.textFile(path).collect()(0)
 res8: String = ?pple
 scala sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], 
 classOf[Text]).map(pair = pair._2.toString).collect()(0).getBytes()
 res9: Array[Byte] = Array(-17, -65, -67, 112, 112, 108, 101)
 scala sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], 
 classOf[Text]).map(pair = pair._2.getBytes).collect()(0)
 res10: Array[Byte] = Array(-60, 112, 112, 108, 101)
 {code}
 In the above example, the first two snippets show the string representation 
 and byte representation of the example line of text. The third snippet shows 
 what happens if you call {{getBytes}} on the {{Text}} object which comes back 
 from hadoop land: we get the real bytes in the file out.
 Now, I think this is a bug, though you may disagree. The text inside my file 
 is perfectly valid iso-8859-1 encoded bytes, which I would like to be able to 
 rescue and re-encode into UTF-8, because I want my application to be smart 
 like that. I think Spark should give me the raw broken string so I can 
 re-encode, but I can't get at the original bytes in order to guess at what 
 the source encoding might be, as they have already been replaced. I'm dealing 
 with data from some CDN access logs which are to put it nicely diversely 
 encoded, but I think a use case Spark should fully support. So, my suggested 
 fix, which I'd like some guidance, is to change {{textFile}} to spit out 
 broken strings by not using {{Text}}'s UTF-8 encoding.
 Further compounding this issue is that my application is actually in PySpark, 
 but we can talk about how bytes fly through to Scala land after this if we 
 agree that this is an issue at all. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1813) Add a utility to SparkConf that makes using Kryo really easy

2014-05-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13996390#comment-13996390
 ] 

Mridul Muralidharan commented on SPARK-1813:


Writing a KryoRegistrator is the only requirement - rest are done as part of 
initialization anyway.
Registering classes with kryo is non trivial except for degenerate cases : for 
example, we have classes we have to use java read/write Object serialization, 
which support kyro serialization, which support java's external serialization, 
generated classes, etc.
And we would need a registrator ... ofcourse, it could be argued this is corner 
case, though I dont think so.

 Add a utility to SparkConf that makes using Kryo really easy
 

 Key: SPARK-1813
 URL: https://issues.apache.org/jira/browse/SPARK-1813
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Sandy Ryza

 It would be nice to have a method in SparkConf that makes it really easy to 
 use Kryo and register a set of classes. without defining you
 Using Kryo currently requires all this:
 {code}
 import com.esotericsoftware.kryo.Kryo
 import org.apache.spark.serializer.KryoRegistrator
 class MyRegistrator extends KryoRegistrator {
   override def registerClasses(kryo: Kryo) {
 kryo.register(classOf[MyClass1])
 kryo.register(classOf[MyClass2])
   }
 }
 val conf = new SparkConf().setMaster(...).setAppName(...)
 conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
 conf.set(spark.kryo.registrator, mypackage.MyRegistrator)
 val sc = new SparkContext(conf)
 {code}
 It would be nice if it just required this:
 {code}
 SparkConf.setKryo(Array(classOf[MyFirstClass, classOf[MySecond]))
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1606) spark-submit needs `--arg` for every application parameter

2014-05-03 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13988756#comment-13988756
 ] 

Mridul Muralidharan commented on SPARK-1606:


Crap, got to this too late.
We really should not have added this - and I would have -1'ed it.

We have had too many issues with trying to parse the user command line and 
getting into all sorts of issues which are entirely avoidable by simply being 
explicit about what the user wants to pass.
Trying to pass strings which contain escape characters and/or whitespace, etc 
is just going to be a nightmare about this change.

 spark-submit needs `--arg` for every application parameter
 --

 Key: SPARK-1606
 URL: https://issues.apache.org/jira/browse/SPARK-1606
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Xiangrui Meng
Assignee: Patrick Wendell
Priority: Blocker
 Fix For: 1.0.0


 If the application has a few parameters, the spark-submit command looks like 
 the following:
 {code}
 spark-submit --master yarn-cluster --class main.Class --arg --numPartitions 
 --arg 8 --arg --kryo --arg true
 {code}
 It is a little bit hard to read and modify. Maybe it is okay to treat all 
 arguments after `main.Class` as application parameters.
 {code}
 spark-submit --master yarn-cluster --class main.Class --numPartitions 8 
 --kryo true
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1706) Allow multiple executors per worker in Standalone mode

2014-05-03 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13988868#comment-13988868
 ] 

Mridul Muralidharan commented on SPARK-1706:


Oh my, this was supposed to be logical addition once yarn changes were done.
Yarn changes were very heavily modelled on standalone mode (hence why 
yarn-standalone !) : and it was supposed to be a two way street : changes made 
for yarn support (multi-tennancy, etc) was supposed to have been added back to 
standalone mode when yarn support stabilized.
Did not realize I never got around to it - my apologies !

 Allow multiple executors per worker in Standalone mode
 --

 Key: SPARK-1706
 URL: https://issues.apache.org/jira/browse/SPARK-1706
 Project: Spark
  Issue Type: Improvement
  Components: Deploy
Reporter: Patrick Wendell
 Fix For: 1.1.0


 Right now if people want to launch multiple executors on each machine they 
 need to start multiple standalone workers. This is not too difficult, but it 
 means you have extra JVM's sitting around.
 We should just allow users to set a number of cores they want per-executor in 
 standalone mode and then allow packing multiple executors on each node. This 
 would make standalone mode more consistent with YARN in the way you request 
 resources.
 It's not too big of a change as far as I can see. You'd need to:
 1. Introduce a configuration for how many cores you want per executor.
 2. Change the scheduling logic in Master.scala to take this into account.
 3. Change CoarseGrainedSchedulerBackend to not assume a 1-1 correspondence 
 between hosts and executors.
 And maybe modify a few other places.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1576) Passing of JAVA_OPTS to YARN on command line

2014-04-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13981313#comment-13981313
 ] 

Mridul Muralidharan commented on SPARK-1576:


There is a misunderstanding here - it is to pass SPARK_JAVA_OPTS : not 
JAVA_OPTS.
Directly passing JAVA_OPTS has beem removed

 Passing of JAVA_OPTS to YARN on command line
 

 Key: SPARK-1576
 URL: https://issues.apache.org/jira/browse/SPARK-1576
 Project: Spark
  Issue Type: Improvement
Affects Versions: 0.9.0, 1.0.0, 0.9.1
Reporter: Nishkam Ravi
 Fix For: 0.9.0, 1.0.0, 0.9.1

 Attachments: SPARK-1576.patch


 JAVA_OPTS can be passed by using either env variables (i.e., SPARK_JAVA_OPTS) 
 or as config vars (after Patrick's recent change). It would be good to allow 
 the user to pass them on command line as well to restrict scope to single 
 application invocation.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1586) Fix issues with spark development under windows

2014-04-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13981321#comment-13981321
 ] 

Mridul Muralidharan commented on SPARK-1586:


Immediate issues fixed though there are more hive tests failing due to path 
related issues. pr : https://github.com/apache/spark/pull/505

 Fix issues with spark development under windows
 ---

 Key: SPARK-1586
 URL: https://issues.apache.org/jira/browse/SPARK-1586
 Project: Spark
  Issue Type: Bug
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 1.0.0






--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (SPARK-1587) Fix thread leak in spark

2014-04-25 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-1587.


Resolution: Fixed

Fixed, https://github.com/apache/spark/pull/504

 Fix thread leak in spark
 

 Key: SPARK-1587
 URL: https://issues.apache.org/jira/browse/SPARK-1587
 Project: Spark
  Issue Type: Bug
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan

 SparkContext.stop does not cause all threads to exit.
 When running tests via scalatest (which keeps reusing the same vm), over 
 time, this causes too many threads to be created causing tests to fail due to 
 inability to create more threads.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (BOOKKEEPER-560) Create readme for hedwig-client-jms

2014-04-25 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/BOOKKEEPER-560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated BOOKKEEPER-560:
---

Assignee: (was: Mridul Muralidharan)

 Create readme for hedwig-client-jms 
 

 Key: BOOKKEEPER-560
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-560
 Project: Bookkeeper
  Issue Type: Task
  Components: Documentation
Reporter: Ivan Kelly

 This module needs a readme describing it as an experimental component and 
 what parts of jms are supported and not supported.
 It would also be good to have a bit more detail on why they can't be 
 supported with hedwig as it is today. A lot of this can be taken verbatim 
 from the package-info.html



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (BOOKKEEPER-648) BasicJMSTest failed

2014-04-25 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated BOOKKEEPER-648:
---

Assignee: (was: Mridul Muralidharan)

 BasicJMSTest failed
 ---

 Key: BOOKKEEPER-648
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648
 Project: Bookkeeper
  Issue Type: Bug
  Components: hedwig-client
Reporter: Flavio Junqueira

 While running tests, I got once a failure for this hedwig-client-jms test: 
 BasicJMSTest.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1588) SPARK_JAVA_OPTS is not getting propagated

2014-04-23 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13978827#comment-13978827
 ] 

Mridul Muralidharan commented on SPARK-1588:


Apparently, SPARK_YARN_USER_ENV is also broken

 SPARK_JAVA_OPTS is not getting propagated
 -

 Key: SPARK-1588
 URL: https://issues.apache.org/jira/browse/SPARK-1588
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Mridul Muralidharan
Priority: Blocker

 We could use SPARK_JAVA_OPTS to pass JAVA_OPTS to be used in the master.
 This is no longer working in current master.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1524) TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process

2014-04-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13972864#comment-13972864
 ] 

Mridul Muralidharan commented on SPARK-1524:


The expectation is to fallback to a previous schedule type in case the higher 
level is not valid : though this is tricky in general case.
Will need to take a look at it - though given that I am tied up with other 
things, if someone else wants to take a crack, please feel free to do so !

Btw, use of IP's and multiple hostnames for a host is not supported in spark - 
so that is something that will need to be resolved at the deployment end.

 TaskSetManager'd better not schedule tasks which has no preferred executorId 
 using PROCESS_LOCAL in the first search process
 

 Key: SPARK-1524
 URL: https://issues.apache.org/jira/browse/SPARK-1524
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor

 ShuffleMapTask is constructed with TaskLocation which has only host not 
 (host, executorID) pair in DAGScheduler.
 When TaskSetManager schedules ShuffleMapTask which has no preferred 
 executorId using specific execId host and PROCESS_LOCAL locality level, no 
 tasks match the given locality constraint in the first search process.
 We also find that the host used by Scheduler is hostname while the host used 
 by TaskLocation is IP in our cluster. The tow hosts do not match, that makes 
 pendingTasksForHost HashMap empty and the finding task process against our 
 expectation.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks

2014-04-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13972978#comment-13972978
 ] 

Mridul Muralidharan commented on SPARK-1476:



[~matei] We are having some issue porting the netty shuffle copier code to 
support  2G since only ByteBuf seems to be exposed.
Before I dig into netty more, wanted to know if you or someone else from among 
spark developers knew how to add support for large buffers in our netty code. 
Thanks !

 2GB limit in spark for blocks
 -

 Key: SPARK-1476
 URL: https://issues.apache.org/jira/browse/SPARK-1476
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
 Environment: all
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
Priority: Critical
 Fix For: 1.1.0


 The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
 the size of the block to 2GB.
 This has implication not just for managed blocks in use, but also for shuffle 
 blocks (memory mapped blocks are limited to 2gig, even though the api allows 
 for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
 This is a severe limitation for use of spark when used on non trivial 
 datasets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1453) Improve the way Spark on Yarn waits for executors before starting

2014-04-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13968390#comment-13968390
 ] 

Mridul Muralidharan commented on SPARK-1453:



(d) becomes relevant in case of headless/cron'ed jobs.
If the job is user initiated, then I agree, the user would typically kill and 
restart the job.

 Improve the way Spark on Yarn waits for executors before starting
 -

 Key: SPARK-1453
 URL: https://issues.apache.org/jira/browse/SPARK-1453
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 1.0.0
Reporter: Thomas Graves
Assignee: Thomas Graves

 Currently Spark on Yarn just delays a few seconds between when the spark 
 context is initialized and when it allows the job to start.  If you are on a 
 busy hadoop cluster is might take longer to get the number of executors. 
 In the very least we could make this timeout a configurable value.  Its 
 currently hardcoded to 3 seconds.  
 Better yet would be to allow user to give a minimum number of executors it 
 wants to wait for, but that looks much more complex. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (SPARK-1476) 2GB limit in spark for blocks

2014-04-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967854#comment-13967854
 ] 

Mridul Muralidharan edited comment on SPARK-1476 at 4/13/14 2:45 PM:
-

There are multiple issues at play here :

a) If a block goes beyond 2G, everything fails - this is the case for shuffle, 
cached and 'normal' blocks.
Irrespective of storage level and/or other config.
In a lot of cases, particularly when data generated 'increases' after a 
map/flatMap/etc, the user has no control over the size increase in the output 
block for a given input block (think iterations over datasets).

b) Increasing number of partitions is not always an option (for the subset of 
usecases where it can be done) :
1) It has an impact on number of intermediate files created while doing a 
shuffle. (ulimit constraints, IO performance issues, etc).
2) It does not help when there is skew anyway.

c) Compression codec, serializer used, etc have an impact.

d) 2G is extremely low limit to have in modern hardware : and this is 
particularly a severe limitation when we have nodes running on 32G to 64G ram 
and TB's of disk space available for spark.


To address specific points raised above :

A) [~pwendell] Mapreduce jobs dont fail in case the block size of files 
increases - it might be inefficient, it still runs (not that I know of any case 
where it does actually, but theoretically I guess it can become inefficient).
So analogy does not apply.
Also to add, 2G is not really an unlimited increase in block size - and in MR, 
output of a map can easily go a couple of orders above 2G : whether it is 
followed by a reduce or not.

B) [~matei] In the specific cases it was failing, the users were not caching 
the data but directly going to shuffle.
There was no skew from what I see : just the data size per key is high; and 
there are a lot of keys too btw (as iterations increase and nnz increases).
Note that it was an impl detail that it was not being cached - it could have 
been too.
Additionally, compression and/or serialization also apply implicitly in this 
case, since it was impacting shuffle - the 2G limit was observed at both the 
map and reduce side (in two different jobs).


In general, our effort is to make spark as a drop in replacement for most 
usecases which are currently being done via MR/Pig/etc.
Limitations of this sort make it difficult to position spark as a credible 
alternative.


Current approach we are exploring is to remove all direct references to 
ByteBuffer from spark (except for ConnectionManager, etc parts); and rely on a 
BlockData or similar datastructure which encapsulate the data corresponding to 
a block. By default, a single ByteBuffer should suffice but in case it does 
not, the class will automatically take care of splitting across blocks.
Similarly, all references to byte array backed streams will need to be replaced 
with a wrapper stream which multiplexes over byte array streams.
The performance impact for all 'normal' usecases should be the minimal, while 
allowing for spark to be used in cases where 2G limit is being hit.

The only unknown here is tachyon integration : where the interface is a 
ByteBuffer - and I am not knowledgable enough to comment on what the issues 
there would be.


was (Author: mridulm80):

There are multiple issues at play here :

a) If a block goes beyond 2G, everything fails - this is the case for shuffle, 
cached and 'normal' blocks.
Irrespective of storage level and/or other config.
In a lot of cases, particularly when data generated 'increases' after a 
map/flatMap/etc, the user has no control over the size increase in the output 
block for a given input block (think iterations over datasets).

b) Increasing number of partitions is not always an option (for the subset of 
usecases where it can be done) :
1) It has an impact on number of intermediate files created while doing a 
shuffle. (ulimit constraints, IO performance issues, etc).
2) It does not help when there is skew anyway.

c) Compression codec, serializer used, etc have an impact.

d) 2G is extremely low limit to have in modern hardware : and this is 
practically a severe limitation when we have nodes running on 32G to 64G ram 
and TB's of disk space available for spark.


To address specific points raised above :

A) [~pwendell] Mapreduce jobs dont fail in case the block size of files 
increases - it might be inefficient, it still runs (not that I know of any case 
where it does actually, but theoretically I guess it can).
So analogy does not apply.
To add, 2G is not really an unlimited increase in block size - and in MR, 
output of a map can easily go a couple of orders above 2G.

B) [~matei] In the specific cases it was failing, the users were not caching 
the data but directly going to shuffle.
There was no skew from what we see : just the data size per key is high; 

[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks

2014-04-12 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967419#comment-13967419
 ] 

Mridul Muralidharan commented on SPARK-1476:


WIP Proposal:

- All references to ByteBuffer will need to be replaced with Seq[ByteBuffer].
This applies to definition of a block, memory mapped file segments for a 
shuffle block, etc.
- All use of byte array backed outputstream will need to be replaced with a 
aggregating outputstream which writes to multiple boas as and when array limits 
are hit.


 2GB limit in spark for blocks
 -

 Key: SPARK-1476
 URL: https://issues.apache.org/jira/browse/SPARK-1476
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
 Environment: all
Reporter: Mridul Muralidharan
Priority: Critical

 The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
 the size of the block to 2GB.
 This has implication not just for managed blocks in use, but also for shuffle 
 blocks (memory mapped blocks are limited to 2gig, even though the api allows 
 for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
 This is a severe limitation for use of spark when used on non trivial 
 datasets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size

2014-04-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13966332#comment-13966332
 ] 

Mridul Muralidharan commented on SPARK-1391:


Another place where this is relevant is here :

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98)
at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:413)
at 
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:339)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:506)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:233)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:52)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1262)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)

So we might want to change the abstraction from single ByteBuffer to a sequence 
of bytebuffers ...

 BlockManager cannot transfer blocks larger than 2G in size
 --

 Key: SPARK-1391
 URL: https://issues.apache.org/jira/browse/SPARK-1391
 Project: Spark
  Issue Type: Bug
  Components: Block Manager, Shuffle
Affects Versions: 1.0.0
Reporter: Shivaram Venkataraman
Assignee: Min Zhou
 Attachments: SPARK-1391.diff


 If a task tries to remotely access a cached RDD block, I get an exception 
 when the block size is  2G. The exception is pasted below.
 Memory capacities are huge these days ( 60G), and many workflows depend on 
 having large blocks in memory, so it would be good to fix this bug.
 I don't know if the same thing happens on shuffles if one transfer (from 
 mapper to reducer) is  2G.
 {noformat}
 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer 
 message
 java.lang.ArrayIndexOutOfBoundsException
 at 
 it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96)
 at 
 it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134)
 at 
 it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164)
 at 
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
 at 
 java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
 at 
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
 at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at 
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38)
 at 
 org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93)
 at 
 org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26)
 at 
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913)
 at 
 org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922)
 at 
 org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102)
 at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348)
 at 
 org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323)
 at 
 org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)
 at 
 org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)
 at 
 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
 at 
 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
 at 
 

[jira] [Commented] (SPARK-542) Cache Miss when machine have multiple hostname

2014-04-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967185#comment-13967185
 ] 

Mridul Muralidharan commented on SPARK-542:
---

Spark uses only hostnames - not ip's.
Even for hostnames, it should ideally pick only the canonical hostname - not 
the others.

This was done by design in 0.8 ... try to find if multiple host names/ip's are 
all referring to the same physical host/container is fraught with too many 
issues.

 Cache Miss when machine have multiple hostname
 --

 Key: SPARK-542
 URL: https://issues.apache.org/jira/browse/SPARK-542
 Project: Spark
  Issue Type: Bug
Reporter: frankvictor

 HI, I encountered a weird runtime of pagerank in last few day.
 After debugging the job, I found it was caused by the DNS name.
 The machines of my cluster have multiple hostname, for example, slave 1 have 
 name (c001 and c001.cm.cluster)
 when spark adding cache in cacheTracker, it get c001 and add cache use it.
 But when schedule task in SimpleJob, the msos offer give spark 
 c001.cm.cluster.
 so It will never get preferred location!
 I thinks spark should handle the multiple hostname case(by using ip instead 
 of hostname, or some other methods).
 Thanks!



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1453) Improve the way Spark on Yarn waits for executors before starting

2014-04-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967193#comment-13967193
 ] 

Mridul Muralidharan commented on SPARK-1453:


The timeout gets hit only when we dont get requested executors, right ? So it 
is more like max timeout (controlled by number of times we loop iirc).
The reason for keeping it stupid was simply because we have no gaurantees of 
number of containers which might be available to spark in a busy cluster : at 
times, it might not be practically possible to even get a fraction of the 
requested nodes (either due to busy cluster, or because of lack of resources - 
so infinite wait).

Ideally, I should have exposed the number of containers allocated - so that 
atleast user code could use it as spi and decide how to proceed for more 
complex cases. Missed out on this one.

I am not sure which usecases make sense.
a) Wait for X seconds or requested containers allocated.
b) Wait until minimum of Y containers allocated (out of X requested).
c) (b) with (a) - that is min containers and timeout on that.
d) (c) with exit if min containers not allocated ?

(d) is something which I keep hitting into (if I dont get my required minimum 
nodes, and job proceeds, I usually end up bringing down those nodes :-( )

 Improve the way Spark on Yarn waits for executors before starting
 -

 Key: SPARK-1453
 URL: https://issues.apache.org/jira/browse/SPARK-1453
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 1.0.0
Reporter: Thomas Graves
Assignee: Thomas Graves

 Currently Spark on Yarn just delays a few seconds between when the spark 
 context is initialized and when it allows the job to start.  If you are on a 
 busy hadoop cluster is might take longer to get the number of executors. 
 In the very least we could make this timeout a configurable value.  Its 
 currently hardcoded to 3 seconds.  
 Better yet would be to allow user to give a minimum number of executors it 
 wants to wait for, but that looks much more complex. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed

2013-07-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13719324#comment-13719324
 ] 

Mridul Muralidharan commented on BOOKKEEPER-648:



On going over the code again, the testcase is extremely simple - it tests the 
sync and async api.

a) Create one publisher and two subscriber sessions for the topic (one via sync 
api, another async api of jms).

b) Send 4 messages through publisher session.

c) Sleep for 10 ms to ensure all messages have been sent - this should not be 
required, but hedwig sometimes takes a lot of time.

d) For sync test, wait for 100 ms for each message to be received : null is 
returned if it times out without receiving any message.
The number of times receive is called is equal to the number of times we sent 
messages - the test expects in-order-delivery without loss of messages as per 
hedwig api contract.
This is what is failing : we are not receiving all the messages we sent.

e) The async session listener does the same as (d) - except on the async 
listener : this did not get tested in this specific case due to validation 
failure in (d).
Looking more, we should probably add a sleep before checking the 
'messageCount.getValue() != CHAT_MESSAGES.length' condition - in case async 
listener is still running in parallel : though this is not the failure we are 
observing ...



Assuming you noticed the same assertion stacktrace in each case when it failed, 
it means no message was received before timeout in sync invocation.
This can be due to :

1) Hedwig or bookkeeper is inordinately slow for some reasons (slow hdd, filled 
up /tmp, low mem, tlb thrashing, etc ?) : in which case, simply bumping up the 
sleep time and receive timeout param will circumvent the issue.

2) There is some bug somewhere in the chain which is causing message drops - 
either at publish time or while sending it to subscribers or somewhere else ?

To get additional debugging info, there are log messages in jms module : but 
(particularly for this testcase) the jms module is a thin wrapper delegating to 
corresponding hedwig client api - so enabling debug there would be more helpful.
Actually, I would validate if the server actually sent messages to both the 
subscribers and they were received by the client - if yes, rest would be a 
client side bug (hedwig client or jms).


If you could reproduce the issue with debug logging enabled for root logger, I 
can definitely help narrow down the issue with those logs !
Unfortunately, there are almost no testcases in hedwig client : so I am not 
sure what design or implementation changes happened in client (or server ?) - 
since I am not keeping track of bookkeeper/hedwig anymore.


 BasicJMSTest failed
 ---

 Key: BOOKKEEPER-648
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648
 Project: Bookkeeper
  Issue Type: Bug
  Components: hedwig-client
Reporter: Flavio Junqueira
Assignee: Mridul Muralidharan

 While running tests, I got once a failure for this hedwig-client-jms test: 
 BasicJMSTest.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed

2013-07-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13719348#comment-13719348
 ] 

Mridul Muralidharan commented on BOOKKEEPER-648:



Flavio, I am not sure what the expectation of this bug is - is it appropriate 
debug message for this testcase ?
As I detailed above - the only message possible is : no message was received 
before timeout - unfortunately, I dont think that is going to help us much in 
debugging it.
The reason I did not add descriptive message for every assertion in the tests 
is because the corresponding JMS api's detail the error conditions in detail 
(when a null can be returned from receive is detailed in JMS api javadocs for 
example).


To actually debug/fix the issue, we will need to enable debug logging in 
server, client api and jms mode.
Subsequently, when issue is observed, we will need to trace whether message was 
actually sent to server, whether server dispatched to both subscribers, whether 
client api received it, and whether it was dispatched to jms.
The jms module does nothing different from any other api user of hedwig - 
barring bugs in it ofcourse :-)

This is an assertion which is typically not expected to fail unless there is 
something broken elsewhere which is causing message loss.

 BasicJMSTest failed
 ---

 Key: BOOKKEEPER-648
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648
 Project: Bookkeeper
  Issue Type: Bug
  Components: hedwig-client
Reporter: Flavio Junqueira
Assignee: Mridul Muralidharan

 While running tests, I got once a failure for this hedwig-client-jms test: 
 BasicJMSTest.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed

2013-07-24 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13718267#comment-13718267
 ] 

Mridul Muralidharan commented on BOOKKEEPER-648:


I was not able to reproduce this with latest svn trunk.

{noformat} 
$ mvn test

[INFO] Scanning for projects...
[INFO] 
[INFO] 
[INFO] Building hedwig-client-jms 4.3.0-SNAPSHOT
[INFO] 
[INFO] 
[INFO] --- javacc-maven-plugin:2.6:jjtree-javacc (jjtree-javacc) @ 
hedwig-client-jms ---
[INFO] Skipping - all parsers are up to date
[INFO] 
[INFO] --- maven-remote-resources-plugin:1.1:process (default) @ 
hedwig-client-jms ---
[INFO] Setting property: classpath.resource.loader.class = 
'org.codehaus.plexus.velocity.ContextClassLoaderResourceLoader'.
[INFO] Setting property: velocimacro.messages.on = 'false'.
[INFO] Setting property: resource.loader = 'classpath'.
[INFO] Setting property: resource.manager.logwhenfound = 'false'.
[INFO] 
[INFO] --- maven-resources-plugin:2.4.3:resources (default-resources) @ 
hedwig-client-jms ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 2 resources
[INFO] Copying 3 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.0:compile (default-compile) @ 
hedwig-client-jms ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 76 source files to 
/home/mridulm/work/bookkeeper/trunk/hedwig-client-jms/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:2.4.3:testResources (default-testResources) @ 
hedwig-client-jms ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory 
/home/mridulm/work/bookkeeper/trunk/hedwig-client-jms/src/test/resources
[INFO] Copying 3 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.0:testCompile (default-testCompile) @ 
hedwig-client-jms ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 104 source files to 
/home/mridulm/work/bookkeeper/trunk/hedwig-client-jms/target/test-classes
[INFO] 
[INFO] --- maven-surefire-plugin:2.9:test (default-test) @ hedwig-client-jms ---
[INFO] Tests are skipped.
[INFO] 
[INFO] --- maven-surefire-plugin:2.9:test (unit-tests) @ hedwig-client-jms ---
[INFO] Surefire report directory: 
/home/mridulm/work/bookkeeper/trunk/hedwig-client-jms/target/surefire-reports

---
 T E S T S
---

---
 T E S T S
---
Running org.apache.hedwig.jms.BasicJMSTest
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 9.516 sec
Running org.apache.hedwig.jms.selector.activemq.SelectorTest
Tests run: 19, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.208 sec
Running org.apache.hedwig.jms.selector.activemq.SelectorParserTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.195 sec
Running org.apache.hedwig.jms.selector.BasicSelectorGrammarTest
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.238 sec

Results :

Tests run: 26, Failures: 0, Errors: 0, Skipped: 0

{noformat}

 BasicJMSTest failed
 ---

 Key: BOOKKEEPER-648
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648
 Project: Bookkeeper
  Issue Type: Bug
  Components: hedwig-client
Reporter: Flavio Junqueira
Assignee: Mridul Muralidharan

 While running tests, I got once a failure for this hedwig-client-jms test: 
 BasicJMSTest.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed

2013-07-24 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13718365#comment-13718365
 ] 

Mridul Muralidharan commented on BOOKKEEPER-648:


I tried it 3 times -
a) mvn test
b) mvn install
Both from the jms subdir

and
c) mvn install for entire bookkeeper.

Was there any other env difference ? Like hdd space constraint, mem
constraint, etc ?
The basic test is actually pretty basic - send msg, and receive the same 

On Wed, Jul 24, 2013 at 6:51 PM, Flavio Junqueira (JIRA)


 BasicJMSTest failed
 ---

 Key: BOOKKEEPER-648
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648
 Project: Bookkeeper
  Issue Type: Bug
  Components: hedwig-client
Reporter: Flavio Junqueira
Assignee: Mridul Muralidharan

 While running tests, I got once a failure for this hedwig-client-jms test: 
 BasicJMSTest.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed

2013-07-24 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13718369#comment-13718369
 ] 

Mridul Muralidharan commented on BOOKKEEPER-648:


I tried it 3 times -
a) mvn test
b) mvn install
Both from the jms subdir

and
c) mvn install for entire bookkeeper.

Was there any other env difference when it failed ? Like hdd space constraint, 
mem constraint, etc ?
The basic test is actually pretty basic - send msg, and receive the same  
if that fails, everything else should also fail.
I did not notice any deprecation warnings, so wondering what has changed !


 BasicJMSTest failed
 ---

 Key: BOOKKEEPER-648
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648
 Project: Bookkeeper
  Issue Type: Bug
  Components: hedwig-client
Reporter: Flavio Junqueira
Assignee: Mridul Muralidharan

 While running tests, I got once a failure for this hedwig-client-jms test: 
 BasicJMSTest.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (BOOKKEEPER-312) Implementation of JMS provider

2013-01-31 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13568313#comment-13568313
 ] 

Mridul Muralidharan commented on BOOKKEEPER-312:


Oh great ! I saw the failed message from hudson, and was not sure what the 
status was ... My point was that there is nothing much we can do about the 
findbugs warnings.

Thanks for clarifying Flavio; and thanks to Ivan and Matthieu for the reviews 
and helpful comments !

 Implementation of JMS provider
 --

 Key: BOOKKEEPER-312
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312
 Project: Bookkeeper
  Issue Type: Sub-task
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 4.3.0

 Attachments: BOOKKEEPER-312.diff, hedwig-client-jms.patch.10


 The JMS provider implementation conforming to the 1.1 spec.
 The limitations as of now are :
 1) No support for Queue's : Hedwig currently does not have a notion of JMS 
 queue's for us to leverage.
 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of 
 connection -(n)- session -(n)- publisher/subscriber. Each session has a 
 hedwig connection.
 Currently I am simulating noLocal, but this IS fragile and works for the 
 duration of connection - ONLY until the message id is still in a LRUCache. As 
 mentioned before, this is a kludge, and not a good solution.
 3) Note that everything is durable in hedwig - so we do not support 
 NON_PERSISTENT delivery mode.
 4) Calling unsubscribe on a durable subscription will fail if it was NOT 
 created in the current session.
 In hedwig, to unsubscribe, we need the subscription id and the topic ... 
 To simulate unsubscribe(), we store the subscriberId to topicName mapping 
 when a create* api is invoked. Hence, if create* was NOT called, then we have 
 no way to infer which topic the subscription-id refers to from hedwig, and so 
 cant unsubscribe.
 The workaround is - simply create a durable subsriber just as a workaround of 
 this limitation - the topicName will be known to the user/client anyway.
 5) Explicit session recovery is not supported.
 Reconnection of hedwig session (either explicitly or implicitly by underlying 
 client implementation) will automatically trigger redelivery of 
 un-acknowledged messages.
 6) Because of the above, setting the JMSRedelivered flag is almost impossible 
 in a consistent way.
 Currently, we simulate it for redelivery due to provider side events : 
 rollback of txn, exception in message listener (primarily).
 At best we can simulate it with a kludge - at risk of potentially running out 
 of resources ... this is being investigated : but unlikely to have a clean 
 fix.
 7) Hedwig only supports marking all messages until seq-id as received : while 
 JMS indicates ability to acknowledge individual messages.
 This distinction is currently unsupported.
 8) JMS spec requires
 A connection's delivery of incoming messages can be temporarily stopped
 using its stop() method. It can be restarted using its start() method. When 
 the connection is stopped, delivery to all the connection’s MessageConsumers 
 is inhibited: synchronous receives block, and messages are not delivered to 
 MessageListeners.
   We honor this for undelivered messages from server - but if stop is called 
 while there are pending messages yet to be delivered to a listener (or 
 buffered in subscriber for receive), then they will be delivered irrespective 
 of stop().

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (BOOKKEEPER-312) Implementation of JMS provider

2013-01-29 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13565194#comment-13565194
 ] 

Mridul Muralidharan commented on BOOKKEEPER-312:



A) About @author tags:
The author tags are from activemq testcases. Currently we do very minimal 
changes to them in the hope that we can do future merges from their codebase 
(unfortunately, this might not be posible anymore, but still ...)

$ find . -type f | xargs grep -i author | grep @
./src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java: * @author 
tmielke
./src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java: * @author 
tmielke
./src/test/java/org/apache/activemq/usecases/TransactionRollbackOrderTest.java: 
* @author Paul Smith
./src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java:
 * @author Rajani Chennamaneni
./src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java:
* @author jlyons
./src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java:
* @author jlyons
./src/test/java/org/apache/activemq/usecases/TransactionTest.java: * @author 
pragmasoft
./src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java:
 * @author Paul Smith


B) About tabs in code:
Again, this is coming from activemq codebase :
(ctrl+v, tab as grep param below)

$ find . -type f  | grep -v \.class | xargs grep '' | grep -v activemq
Binary file ./target/hedwig-client-jms-4.3.0-SNAPSHOT.jar matches


C) trailing spaces
There are quite a lot of cases of trailing spaces - including in generated code.
Is this mandatory to remove this ? It will be very timeconsuming to do so 
manually; and I dont have the time to try to script it.
If there is something already present, I can ofcourse run it and resubmit patch.


D) Files with lines longer than 120 characters.
There are a bunch of files with lines longer than 120 characters : mostly in 
comments and testcases.
I will try to remove them if it is a priority - unfortunately, I wont be able 
to get to these for a while due to other commitments.

$ for i in `find src -type f `; do if [ -n `awk 'length($0)  120' $i` ]; 
then echo $i; fi; done | grep -v activemq | grep java$
src/test/java/org/apache/hedwig/jms/selector/BasicSelectorGrammarTest.java
src/test/java/org/apache/hedwig/jms/BasicJMSTest.java
src/test/java/org/apache/hedwig/JmsTestBase.java
src/main/java/org/apache/hedwig/jms/selector/ValueComparisonFunction.java
src/main/java/org/apache/hedwig/jms/SessionImpl.java
src/main/java/org/apache/hedwig/jms/ConnectionImpl.java
src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java
src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java
src/main/java/org/apache/hedwig/jms/message/header/JmsHeader.java


 Implementation of JMS provider
 --

 Key: BOOKKEEPER-312
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312
 Project: Bookkeeper
  Issue Type: Sub-task
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 4.3.0

 Attachments: hedwig-client-jms.patch, hedwig-client-jms.patch.1, 
 hedwig-client-jms.patch.10, hedwig-client-jms.patch.10, 
 hedwig-client-jms.patch.2, hedwig-client-jms.patch.3, 
 hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, 
 hedwig-client-jms.patch.9, hedwig-client-jms.patch.9


 The JMS provider implementation conforming to the 1.1 spec.
 The limitations as of now are :
 1) No support for Queue's : Hedwig currently does not have a notion of JMS 
 queue's for us to leverage.
 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of 
 connection -(n)- session -(n)- publisher/subscriber. Each session has a 
 hedwig connection.
 Currently I am simulating noLocal, but this IS fragile and works for the 
 duration of connection - ONLY until the message id is still in a LRUCache. As 
 mentioned before, this is a kludge, and not a good solution.
 3) Note that everything is durable in hedwig - so we do not support 
 NON_PERSISTENT delivery mode.
 4) Calling unsubscribe on a durable subscription will fail if it was NOT 
 created in the current session.
 In hedwig, to unsubscribe, we need the subscription id and the topic ... 
 To simulate unsubscribe(), we store the subscriberId to topicName mapping 
 when a create* api is invoked. Hence, if create* was NOT called, then we have 
 no way to infer which topic the subscription-id refers to from hedwig, and so 
 cant unsubscribe.
 The workaround is - simply create a durable subsriber just as a workaround of 
 this limitation - the topicName will be known to the user/client anyway.
 5) Explicit session recovery is not supported.
 Reconnection of hedwig 

[jira] [Commented] (BOOKKEEPER-312) Implementation of JMS provider

2013-01-29 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13565644#comment-13565644
 ] 

Mridul Muralidharan commented on BOOKKEEPER-312:


Hi Ivan,


a) The github repo I have is horribly out of date (and has other changes in it) 
: as of now, the only thing I have is the patch which finished the review.


b) It is not a direct import of the activemq testcode : but it has gone through 
quite a bit of change to allow use with hedwig (most of it script generated).
The only reason we import the activemq code is to allow us to test the corner 
cases in jms spec. Though it is not mandatory to functionality include it 
(worst case, we can drop it - just like we dropped JORAM testcases due to 
incompatible license) : but imo, considering the significant value they bring, 
we should include it if possible.


c) Regarding licenses : that is weird, my silly scripts did not catch them !
I will add the license and re-submit : thanks for the pointer, that was a bad 
oversight; my apologies

 Implementation of JMS provider
 --

 Key: BOOKKEEPER-312
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312
 Project: Bookkeeper
  Issue Type: Sub-task
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 4.3.0

 Attachments: hedwig-client-jms.patch, hedwig-client-jms.patch.1, 
 hedwig-client-jms.patch.10, hedwig-client-jms.patch.10, 
 hedwig-client-jms.patch.2, hedwig-client-jms.patch.3, 
 hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, 
 hedwig-client-jms.patch.9, hedwig-client-jms.patch.9


 The JMS provider implementation conforming to the 1.1 spec.
 The limitations as of now are :
 1) No support for Queue's : Hedwig currently does not have a notion of JMS 
 queue's for us to leverage.
 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of 
 connection -(n)- session -(n)- publisher/subscriber. Each session has a 
 hedwig connection.
 Currently I am simulating noLocal, but this IS fragile and works for the 
 duration of connection - ONLY until the message id is still in a LRUCache. As 
 mentioned before, this is a kludge, and not a good solution.
 3) Note that everything is durable in hedwig - so we do not support 
 NON_PERSISTENT delivery mode.
 4) Calling unsubscribe on a durable subscription will fail if it was NOT 
 created in the current session.
 In hedwig, to unsubscribe, we need the subscription id and the topic ... 
 To simulate unsubscribe(), we store the subscriberId to topicName mapping 
 when a create* api is invoked. Hence, if create* was NOT called, then we have 
 no way to infer which topic the subscription-id refers to from hedwig, and so 
 cant unsubscribe.
 The workaround is - simply create a durable subsriber just as a workaround of 
 this limitation - the topicName will be known to the user/client anyway.
 5) Explicit session recovery is not supported.
 Reconnection of hedwig session (either explicitly or implicitly by underlying 
 client implementation) will automatically trigger redelivery of 
 un-acknowledged messages.
 6) Because of the above, setting the JMSRedelivered flag is almost impossible 
 in a consistent way.
 Currently, we simulate it for redelivery due to provider side events : 
 rollback of txn, exception in message listener (primarily).
 At best we can simulate it with a kludge - at risk of potentially running out 
 of resources ... this is being investigated : but unlikely to have a clean 
 fix.
 7) Hedwig only supports marking all messages until seq-id as received : while 
 JMS indicates ability to acknowledge individual messages.
 This distinction is currently unsupported.
 8) JMS spec requires
 A connection's delivery of incoming messages can be temporarily stopped
 using its stop() method. It can be restarted using its start() method. When 
 the connection is stopped, delivery to all the connection’s MessageConsumers 
 is inhibited: synchronous receives block, and messages are not delivered to 
 MessageListeners.
   We honor this for undelivered messages from server - but if stop is called 
 while there are pending messages yet to be delivered to a listener (or 
 buffered in subscriber for receive), then they will be delivered irrespective 
 of stop().

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (BOOKKEEPER-312) Implementation of JMS provider

2013-01-29 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated BOOKKEEPER-312:
---

Attachment: (was: hedwig-client-jms.patch)

 Implementation of JMS provider
 --

 Key: BOOKKEEPER-312
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312
 Project: Bookkeeper
  Issue Type: Sub-task
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 4.3.0

 Attachments: hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, 
 hedwig-client-jms.patch.9, hedwig-client-jms.patch.9


 The JMS provider implementation conforming to the 1.1 spec.
 The limitations as of now are :
 1) No support for Queue's : Hedwig currently does not have a notion of JMS 
 queue's for us to leverage.
 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of 
 connection -(n)- session -(n)- publisher/subscriber. Each session has a 
 hedwig connection.
 Currently I am simulating noLocal, but this IS fragile and works for the 
 duration of connection - ONLY until the message id is still in a LRUCache. As 
 mentioned before, this is a kludge, and not a good solution.
 3) Note that everything is durable in hedwig - so we do not support 
 NON_PERSISTENT delivery mode.
 4) Calling unsubscribe on a durable subscription will fail if it was NOT 
 created in the current session.
 In hedwig, to unsubscribe, we need the subscription id and the topic ... 
 To simulate unsubscribe(), we store the subscriberId to topicName mapping 
 when a create* api is invoked. Hence, if create* was NOT called, then we have 
 no way to infer which topic the subscription-id refers to from hedwig, and so 
 cant unsubscribe.
 The workaround is - simply create a durable subsriber just as a workaround of 
 this limitation - the topicName will be known to the user/client anyway.
 5) Explicit session recovery is not supported.
 Reconnection of hedwig session (either explicitly or implicitly by underlying 
 client implementation) will automatically trigger redelivery of 
 un-acknowledged messages.
 6) Because of the above, setting the JMSRedelivered flag is almost impossible 
 in a consistent way.
 Currently, we simulate it for redelivery due to provider side events : 
 rollback of txn, exception in message listener (primarily).
 At best we can simulate it with a kludge - at risk of potentially running out 
 of resources ... this is being investigated : but unlikely to have a clean 
 fix.
 7) Hedwig only supports marking all messages until seq-id as received : while 
 JMS indicates ability to acknowledge individual messages.
 This distinction is currently unsupported.
 8) JMS spec requires
 A connection's delivery of incoming messages can be temporarily stopped
 using its stop() method. It can be restarted using its start() method. When 
 the connection is stopped, delivery to all the connection’s MessageConsumers 
 is inhibited: synchronous receives block, and messages are not delivered to 
 MessageListeners.
   We honor this for undelivered messages from server - but if stop is called 
 while there are pending messages yet to be delivered to a listener (or 
 buffered in subscriber for receive), then they will be delivered irrespective 
 of stop().

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (BOOKKEEPER-312) Implementation of JMS provider

2013-01-29 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated BOOKKEEPER-312:
---

Attachment: (was: hedwig-client-jms.patch.10)

 Implementation of JMS provider
 --

 Key: BOOKKEEPER-312
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312
 Project: Bookkeeper
  Issue Type: Sub-task
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 4.3.0

 Attachments: hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, 
 hedwig-client-jms.patch.9, hedwig-client-jms.patch.9


 The JMS provider implementation conforming to the 1.1 spec.
 The limitations as of now are :
 1) No support for Queue's : Hedwig currently does not have a notion of JMS 
 queue's for us to leverage.
 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of 
 connection -(n)- session -(n)- publisher/subscriber. Each session has a 
 hedwig connection.
 Currently I am simulating noLocal, but this IS fragile and works for the 
 duration of connection - ONLY until the message id is still in a LRUCache. As 
 mentioned before, this is a kludge, and not a good solution.
 3) Note that everything is durable in hedwig - so we do not support 
 NON_PERSISTENT delivery mode.
 4) Calling unsubscribe on a durable subscription will fail if it was NOT 
 created in the current session.
 In hedwig, to unsubscribe, we need the subscription id and the topic ... 
 To simulate unsubscribe(), we store the subscriberId to topicName mapping 
 when a create* api is invoked. Hence, if create* was NOT called, then we have 
 no way to infer which topic the subscription-id refers to from hedwig, and so 
 cant unsubscribe.
 The workaround is - simply create a durable subsriber just as a workaround of 
 this limitation - the topicName will be known to the user/client anyway.
 5) Explicit session recovery is not supported.
 Reconnection of hedwig session (either explicitly or implicitly by underlying 
 client implementation) will automatically trigger redelivery of 
 un-acknowledged messages.
 6) Because of the above, setting the JMSRedelivered flag is almost impossible 
 in a consistent way.
 Currently, we simulate it for redelivery due to provider side events : 
 rollback of txn, exception in message listener (primarily).
 At best we can simulate it with a kludge - at risk of potentially running out 
 of resources ... this is being investigated : but unlikely to have a clean 
 fix.
 7) Hedwig only supports marking all messages until seq-id as received : while 
 JMS indicates ability to acknowledge individual messages.
 This distinction is currently unsupported.
 8) JMS spec requires
 A connection's delivery of incoming messages can be temporarily stopped
 using its stop() method. It can be restarted using its start() method. When 
 the connection is stopped, delivery to all the connection’s MessageConsumers 
 is inhibited: synchronous receives block, and messages are not delivered to 
 MessageListeners.
   We honor this for undelivered messages from server - but if stop is called 
 while there are pending messages yet to be delivered to a listener (or 
 buffered in subscriber for receive), then they will be delivered irrespective 
 of stop().

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (BOOKKEEPER-312) Implementation of JMS provider

2013-01-29 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated BOOKKEEPER-312:
---

Attachment: (was: hedwig-client-jms.patch.10)

 Implementation of JMS provider
 --

 Key: BOOKKEEPER-312
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312
 Project: Bookkeeper
  Issue Type: Sub-task
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 4.3.0

 Attachments: hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, 
 hedwig-client-jms.patch.9, hedwig-client-jms.patch.9


 The JMS provider implementation conforming to the 1.1 spec.
 The limitations as of now are :
 1) No support for Queue's : Hedwig currently does not have a notion of JMS 
 queue's for us to leverage.
 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of 
 connection -(n)- session -(n)- publisher/subscriber. Each session has a 
 hedwig connection.
 Currently I am simulating noLocal, but this IS fragile and works for the 
 duration of connection - ONLY until the message id is still in a LRUCache. As 
 mentioned before, this is a kludge, and not a good solution.
 3) Note that everything is durable in hedwig - so we do not support 
 NON_PERSISTENT delivery mode.
 4) Calling unsubscribe on a durable subscription will fail if it was NOT 
 created in the current session.
 In hedwig, to unsubscribe, we need the subscription id and the topic ... 
 To simulate unsubscribe(), we store the subscriberId to topicName mapping 
 when a create* api is invoked. Hence, if create* was NOT called, then we have 
 no way to infer which topic the subscription-id refers to from hedwig, and so 
 cant unsubscribe.
 The workaround is - simply create a durable subsriber just as a workaround of 
 this limitation - the topicName will be known to the user/client anyway.
 5) Explicit session recovery is not supported.
 Reconnection of hedwig session (either explicitly or implicitly by underlying 
 client implementation) will automatically trigger redelivery of 
 un-acknowledged messages.
 6) Because of the above, setting the JMSRedelivered flag is almost impossible 
 in a consistent way.
 Currently, we simulate it for redelivery due to provider side events : 
 rollback of txn, exception in message listener (primarily).
 At best we can simulate it with a kludge - at risk of potentially running out 
 of resources ... this is being investigated : but unlikely to have a clean 
 fix.
 7) Hedwig only supports marking all messages until seq-id as received : while 
 JMS indicates ability to acknowledge individual messages.
 This distinction is currently unsupported.
 8) JMS spec requires
 A connection's delivery of incoming messages can be temporarily stopped
 using its stop() method. It can be restarted using its start() method. When 
 the connection is stopped, delivery to all the connection’s MessageConsumers 
 is inhibited: synchronous receives block, and messages are not delivered to 
 MessageListeners.
   We honor this for undelivered messages from server - but if stop is called 
 while there are pending messages yet to be delivered to a listener (or 
 buffered in subscriber for receive), then they will be delivered irrespective 
 of stop().

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (BOOKKEEPER-312) Implementation of JMS provider

2013-01-29 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated BOOKKEEPER-312:
---

Attachment: (was: hedwig-client-jms.patch.2)

 Implementation of JMS provider
 --

 Key: BOOKKEEPER-312
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312
 Project: Bookkeeper
  Issue Type: Sub-task
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 4.3.0

 Attachments: hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, 
 hedwig-client-jms.patch.9, hedwig-client-jms.patch.9


 The JMS provider implementation conforming to the 1.1 spec.
 The limitations as of now are :
 1) No support for Queue's : Hedwig currently does not have a notion of JMS 
 queue's for us to leverage.
 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of 
 connection -(n)- session -(n)- publisher/subscriber. Each session has a 
 hedwig connection.
 Currently I am simulating noLocal, but this IS fragile and works for the 
 duration of connection - ONLY until the message id is still in a LRUCache. As 
 mentioned before, this is a kludge, and not a good solution.
 3) Note that everything is durable in hedwig - so we do not support 
 NON_PERSISTENT delivery mode.
 4) Calling unsubscribe on a durable subscription will fail if it was NOT 
 created in the current session.
 In hedwig, to unsubscribe, we need the subscription id and the topic ... 
 To simulate unsubscribe(), we store the subscriberId to topicName mapping 
 when a create* api is invoked. Hence, if create* was NOT called, then we have 
 no way to infer which topic the subscription-id refers to from hedwig, and so 
 cant unsubscribe.
 The workaround is - simply create a durable subsriber just as a workaround of 
 this limitation - the topicName will be known to the user/client anyway.
 5) Explicit session recovery is not supported.
 Reconnection of hedwig session (either explicitly or implicitly by underlying 
 client implementation) will automatically trigger redelivery of 
 un-acknowledged messages.
 6) Because of the above, setting the JMSRedelivered flag is almost impossible 
 in a consistent way.
 Currently, we simulate it for redelivery due to provider side events : 
 rollback of txn, exception in message listener (primarily).
 At best we can simulate it with a kludge - at risk of potentially running out 
 of resources ... this is being investigated : but unlikely to have a clean 
 fix.
 7) Hedwig only supports marking all messages until seq-id as received : while 
 JMS indicates ability to acknowledge individual messages.
 This distinction is currently unsupported.
 8) JMS spec requires
 A connection's delivery of incoming messages can be temporarily stopped
 using its stop() method. It can be restarted using its start() method. When 
 the connection is stopped, delivery to all the connection’s MessageConsumers 
 is inhibited: synchronous receives block, and messages are not delivered to 
 MessageListeners.
   We honor this for undelivered messages from server - but if stop is called 
 while there are pending messages yet to be delivered to a listener (or 
 buffered in subscriber for receive), then they will be delivered irrespective 
 of stop().

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


<    1   2   3   4   5   6   7   8   >