[jira] [Commented] (SPARK-3889) JVM dies with SIGBUS, resulting in ConnectionManager failed ACK
[ https://issues.apache.org/jira/browse/SPARK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14167303#comment-14167303 ] Mridul Muralidharan commented on SPARK-3889: The status says fixed - what was done to resolve this ? I did not see a PR ... JVM dies with SIGBUS, resulting in ConnectionManager failed ACK --- Key: SPARK-3889 URL: https://issues.apache.org/jira/browse/SPARK-3889 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Aaron Davidson Assignee: Aaron Davidson Priority: Critical Fix For: 1.2.0 Here's the first part of the core dump, possibly caused by a job which shuffles a lot of very small partitions. {code} # # A fatal error has been detected by the Java Runtime Environment: # # SIGBUS (0x7) at pc=0x7fa5885fcdb0, pid=488, tid=140343502632704 # # JRE version: 7.0_25-b30 # Java VM: OpenJDK 64-Bit Server VM (23.7-b01 mixed mode linux-amd64 compressed oops) # Problematic frame: # v ~StubRoutines::jbyte_disjoint_arraycopy # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try ulimit -c unlimited before starting Java again # # If you would like to submit a bug report, please include # instructions on how to reproduce the bug and visit: # https://bugs.launchpad.net/ubuntu/+source/openjdk-7/ # --- T H R E A D --- Current thread (0x7fa4b0631000): JavaThread Executor task launch worker-170 daemon [_thread_in_Java, id=6783, stack(0x7fa4448ef000,0x7fa4449f)] siginfo:si_signo=SIGBUS: si_errno=0, si_code=2 (BUS_ADRERR), si_addr=0x7fa428f79000 {code} Here is the only useful content I can find related to JVM and SIGBUS from Google: https://bugzilla.redhat.com/show_bug.cgi?format=multipleid=976664 It appears it may be related to disposing byte buffers, which we do in the ConnectionManager -- we mmap shuffle files via ManagedBuffer and dispose of them in BufferMessage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3785) Support off-loading computations to a GPU
[ https://issues.apache.org/jira/browse/SPARK-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163326#comment-14163326 ] Mridul Muralidharan commented on SPARK-3785: [~sowen] We had prototyped a solution for doing just this - the way we did it was to add a new StorageLevel - which was higher than ProcessLevel - and maintain information about which card hosted the data in an executor, and other handles/metadata required to offload computation to the accelerator card (so not just gpu). And we set appropriate level delays so that in case the rdd had gpu data, no other computation level is allowed (unless there is a loss of executor). It was a prototype - so we did not solve all issues which arise - including how to expose eviction of data from gpu back to main memory/disk in case of memory pressure, more efficient failure modes, moving data between gpu's to balance the memory and computational load/rdd replication between gpus in an executor, multi-tennancy, etc : our initial target usecases just required running various (expensive) closures on the data and the result was to be pulled off the card (which was fairly 'small') - so not all of these needed to be solved alteast for the prototype :-) I cant get into the gory details or the actual benchmark numbers though, apologies. In general, is it worth it ? For very specific cases, I would say it is a phenomenal - allowing 2 - 3 orders of performance boost vertically ! But for cases where it is not a good fit, it is terrible - even for cases where it was intutively supposed to work, inefficiencies we incur make it terrible at times. Support off-loading computations to a GPU - Key: SPARK-3785 URL: https://issues.apache.org/jira/browse/SPARK-3785 Project: Spark Issue Type: Brainstorming Components: MLlib Reporter: Thomas Darimont Priority: Minor Are there any plans to adding support for off-loading computations to the GPU, e.g. via an open-cl binding? http://www.jocl.org/ https://code.google.com/p/javacl/ http://lwjgl.org/wiki/index.php?title=OpenCL_in_LWJGL -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark
[ https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163351#comment-14163351 ] Mridul Muralidharan commented on SPARK-3561: [~pwendell] If I understood the proposal and the initial pr submitted - the intent of this JIRA, as initally proposed by [~ozhurakousky] is fairly different from the other efforts referenced if I am not wrong. The focus of this change seems to be to completely bypass spark execution engine and substitute an alternative : so only the current api (and so dag creation from the spark program) and user interfaces in spark remain - the block management, execution engine, execution state management, etc would all be replaced under the covers by what Tez (or something else in future) provides. If I am not wrong the changes would be : a) Applies only to yarn mode - when specified execution environment can be run. b) the current spark AM would no longer request for any executors. c) spark block manager would no longer be required (other than possibly for hosting broadcast via http i guess ?). d) the actual DAG execution would be taken up by the overridden execution engine - spark's Task manager and DAG scheduler are noop's. I might be missing things which Oleg can elaborate on. This functionality, IMO, is fundamentally different from what is being explored in the other jira's - and so has value to be pursued independent of the other efforts. Obviously this does not work in all usecases where spark is run on - but handles a subset of usecases where other execution engines might do much better than spark currently does - simply because of better code maturity and specialized usecases they target. Allow for pluggable execution contexts in Spark --- Key: SPARK-3561 URL: https://issues.apache.org/jira/browse/SPARK-3561 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.1.0 Reporter: Oleg Zhurakousky Labels: features Fix For: 1.2.0 Attachments: SPARK-3561.pdf Currently Spark provides integration with external resource-managers such as Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the current architecture of Spark-on-YARN can be enhanced to provide significantly better utilization of cluster resources for large scale, batch and/or ETL applications when run alongside other applications (Spark and others) and services in YARN. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark
[ https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163376#comment-14163376 ] Mridul Muralidharan commented on SPARK-3561: [~ozhurakousky] I think the disconnect here is that the interfaces proposed do not show much value in what is supposed to be the functionality to be exposed to the users. A followup pr showing how this interfaces are used in context of Tez would show value in why this change is relevant in context of spark. The disconnect, if I am not wrong, is that we do not want to expose spi's which we would then need to maintain in spark core - while unknown implementations extend it in non standard ways causing issues to our end users. For example, even though TaskScheduler is an spi and can in theory be extended in arbitrary ways - all the spi implementations currently 'live' within spark and are in harmony with rest of the code - and changes which occur within spark core (when functionality is added or extended). This allows us to decouple the actual TaskScheduler implementation from spark code, while still keeping them in sync and maintainable while adding functionality independent of other pieces : case in point, yarn support has significantly evolved from when I initially added it - to the point where it probably does not share even a single line of code I initially wrote :) - and yet this has been done pretty much independent of changes to core while at the same time ensuring that it is compatible with changes in spark core and vice versa. The next step, imo, would be a PR which shows how these interfaces are used for non trivial usecase : Tez in this case. The default implementation provided in the pr can be removed (since it should not be used/exposed to users). Once that is done, we can evaluate the interface proposed in context of the functionality exposed, and see how it fits in context of rest of spark. Allow for pluggable execution contexts in Spark --- Key: SPARK-3561 URL: https://issues.apache.org/jira/browse/SPARK-3561 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.1.0 Reporter: Oleg Zhurakousky Labels: features Fix For: 1.2.0 Attachments: SPARK-3561.pdf Currently Spark provides integration with external resource-managers such as Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the current architecture of Spark-on-YARN can be enhanced to provide significantly better utilization of cluster resources for large scale, batch and/or ETL applications when run alongside other applications (Spark and others) and services in YARN. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3847) Enum.hashCode is only consistent within the same JVM
[ https://issues.apache.org/jira/browse/SPARK-3847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163407#comment-14163407 ] Mridul Muralidharan commented on SPARK-3847: Wow, nice bug ! This is unexpected - thanks for reporting this. Enum.hashCode is only consistent within the same JVM Key: SPARK-3847 URL: https://issues.apache.org/jira/browse/SPARK-3847 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: Oracle JDK 7u51 64bit on Ubuntu 12.04 Reporter: Nathan Bijnens Labels: enum When using java Enum's as key in some operations the results will be very unexpected. The issue is that the Java Enum.hashCode returns the memoryposition, which is different on each JVM. {code} messages.filter(_.getHeader.getKind == Kind.EVENT).count 503650 val tmp = messages.filter(_.getHeader.getKind == Kind.EVENT) tmp.map(_.getHeader.getKind).countByValue Map(EVENT - 1389) {code} Because it's actually a JVM issue we either should reject with an error enums as key or implement a workaround. A good writeup of the issue can be found here (and a workaround): http://dev.bizo.com/2014/02/beware-enums-in-spark.html Somewhat more on the hash codes and Enum's: https://stackoverflow.com/questions/4885095/what-is-the-reason-behind-enum-hashcode And some issues (most of them rejected) at the Oracle Bug Java database: - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8050217 - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7190798 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3561) Allow for pluggable execution contexts in Spark
[ https://issues.apache.org/jira/browse/SPARK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14164095#comment-14164095 ] Mridul Muralidharan commented on SPARK-3561: I agree with [~pwendell] that it does not help spark to introduce dependency of tez on core. [~ozhurakousky] is tez available on all yarn clusters ? Or is it an additional runtime dependency ? If it is available by default - we can make it a runtime switch to use tez for jobs running on yarn-standalone and yarn-client mode. But before that ... While better multi-tennancy would be a likely benefit - my specific interest in this patch is more to do with the much better shuffle performance that tez offers :-) Specicially for ETL jobs, I can see other benefits which might be relevant - one of our collaborative filtering implementation, though not ETL, comes fairly close to it in job characterstics and suffers due to some of our shuffle issues ... As I alluded to, I do not think we should have an openended extension point - where any class name can be provided which extends functionality in arbitrary manner - for example, like the spi we have for compression codecs. As Patrick mentioned, this gives the impression that the approach is blessed by spark developers - even if tagged with Experimental. Particularly with core internals, I would be very wary of exposing them via an spi - simply because we need the freedom to evolve them for performance or functionality reasons. On other hand, I am in favour of exploring this option to see what sort of benefits we get out of this assuming it has been prototyped already - which I thought was the case here, though I am yet to see a PR with that (not sure if I missed it !). Given that Tez is supposed to be reasonably mature - if there is a spark + tez version, I want to see what benefits (if any) are observed as a result of this effort. I had discussed spark + tez integration about an year or so back with Matei - but at that time, tez was probably not that mature - maybe this is a better time ! [~ozhurakousky] Do you have a spark on tez prototype done already ? Or is this an experiment you are yet to complete ? If complete, what sort of performance difference do you see ? What metrics are you using ? If there are significant benefits, I would want to take a closer look at the final proposed patch ... I would be interested in it making into spark in some form. As [~nchammas] mentioned - if it is possible to address it in spark directly, nothing like it - particularly since it will benefit all modes of execution and not just yarn + tez combination. If the gap cant be narrowed, and the benefits are significant (for some, as of now underfined, definition of benefits and significant) - then we can consider tez dependency in yarn module. Ofcourse, all these questions are moot - until we have better quantitative judgement of what the expected gains are and what the experimental results are. Allow for pluggable execution contexts in Spark --- Key: SPARK-3561 URL: https://issues.apache.org/jira/browse/SPARK-3561 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.1.0 Reporter: Oleg Zhurakousky Labels: features Fix For: 1.2.0 Attachments: SPARK-3561.pdf Currently Spark provides integration with external resource-managers such as Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the current architecture of Spark-on-YARN can be enhanced to provide significantly better utilization of cluster resources for large scale, batch and/or ETL applications when run alongside other applications (Spark and others) and services in YARN. Proposal: The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@DeveloperAPI) not exposed to end users of Spark. The trait will define 4 only operations: * hadoopFile * newAPIHadoopFile * broadcast * runJob Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as execution-context:foo.bar.MyJobExecutionContext with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext. Please see the attached design doc for more details. Pull Request will be posted shortly as well -- This message was
[jira] [Commented] (SPARK-3847) Enum.hashCode is only consistent within the same JVM
[ https://issues.apache.org/jira/browse/SPARK-3847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14164775#comment-14164775 ] Mridul Muralidharan commented on SPARK-3847: [~joshrosen] array hashcode might be understandable - but enum's messing hashcode up is unexpected to say the least :-) You are right, we need to add something similar. Maybe have a blacklist of key types which are unstable to be used in distributed setting - I have a feeling we might end up with a longer list than just array and enum. Enum.hashCode is only consistent within the same JVM Key: SPARK-3847 URL: https://issues.apache.org/jira/browse/SPARK-3847 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: Oracle JDK 7u51 64bit on Ubuntu 12.04 Reporter: Nathan Bijnens Labels: enum When using java Enum's as key in some operations the results will be very unexpected. The issue is that the Java Enum.hashCode returns the memoryposition, which is different on each JVM. {code} messages.filter(_.getHeader.getKind == Kind.EVENT).count 503650 val tmp = messages.filter(_.getHeader.getKind == Kind.EVENT) tmp.map(_.getHeader.getKind).countByValue Map(EVENT - 1389) {code} Because it's actually a JVM issue we either should reject with an error enums as key or implement a workaround. A good writeup of the issue can be found here (and a workaround): http://dev.bizo.com/2014/02/beware-enums-in-spark.html Somewhat more on the hash codes and Enum's: https://stackoverflow.com/questions/4885095/what-is-the-reason-behind-enum-hashcode And some issues (most of them rejected) at the Oracle Bug Java database: - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8050217 - http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7190798 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3714) Spark workflow scheduler
[ https://issues.apache.org/jira/browse/SPARK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14151486#comment-14151486 ] Mridul Muralidharan commented on SPARK-3714: Most of the drawbacks mentioned are not severe imo - at best, they are unfamiliarity with oozie platform (points 2, 3, 4, 5). Point 1 is interesting (sharing spark context) - though from a fault tolerance point of view, it makes supporting it challenging; ofcourse oozie was not, probably, designed with something like spark in mind - so there might be changes to oozie which might benefit spark; we could engage with oozie dev for that. But discarding it to reinvent something when oozie already does everything mentioned in requirements section seems counterintutive. I have seen multiple attempts to 'simplify' workflow management, and at production scale almost everything ends up being similar ... Note that most production jobs have to depend on a variety of jobs - not just spark or MR - so you will end up converigng on a variant of oozie anyway :-) Having said that, if you want to take a crack at solving this with spark specific idioms in mind, it would be interesting to see the result - I dont want to dissuade from doing so ! We might end up with something quite interesting. Spark workflow scheduler Key: SPARK-3714 URL: https://issues.apache.org/jira/browse/SPARK-3714 Project: Spark Issue Type: New Feature Components: Project Infra Reporter: Egor Pakhomov Priority: Minor [Design doc | https://docs.google.com/document/d/1q2Q8Ux-6uAkH7wtLJpc3jz-GfrDEjlbWlXtf20hvguk/edit?usp=sharing] Spark stack currently hard to use in the production processes due to the lack of next features: * Scheduling spark jobs * Retrying failed spark job in big pipeline * Share context among jobs in pipeline * Queue jobs Typical usecase for such platform would be - wait for new data, process new data, learn ML models on new data, compare model with previous one, in case of success - rewrite model in HDFS directory for current production model with new one. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3714) Spark workflow scheduler
[ https://issues.apache.org/jira/browse/SPARK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14151211#comment-14151211 ] Mridul Muralidharan commented on SPARK-3714: Have you tried using oozie for this ? IIRC Tom has already gotten this working here quite a while back /CC [~tgraves] Spark workflow scheduler Key: SPARK-3714 URL: https://issues.apache.org/jira/browse/SPARK-3714 Project: Spark Issue Type: New Feature Components: Project Infra Reporter: Egor Pakhomov Priority: Minor [Design doc | https://docs.google.com/document/d/1q2Q8Ux-6uAkH7wtLJpc3jz-GfrDEjlbWlXtf20hvguk/edit?usp=sharing] Spark stack currently hard to use in the production processes due to the lack of next features: * Scheduling spark jobs * Retrying failed spark job in big pipeline * Share context among jobs in pipeline * Queue jobs Typical usecase for such platform would be - wait for new data, process new data, learn ML models on new data, compare model with previous one, in case of success - rewrite model in HDFS directory for current production model with new one. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1956) Enable shuffle consolidation by default
[ https://issues.apache.org/jira/browse/SPARK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14125027#comment-14125027 ] Mridul Muralidharan commented on SPARK-1956: The recent changes to BlockObjectWriter have introduced bugs again ... I don't know how badly they affect the codebase, but it would not be prudent to enable it by default until they are fixed and changes properly analyzed. Enable shuffle consolidation by default --- Key: SPARK-1956 URL: https://issues.apache.org/jira/browse/SPARK-1956 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza The only drawbacks are on ext3, and most everyone has ext4 at this point. I think it's better to aim the default at the common case. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119362#comment-14119362 ] Mridul Muralidharan commented on SPARK-3019: Just went over the proposal in some detail. [~rxin] did you take a look at the proposal in SPARK-1476 ? The ManagedBuffer detailed in this document does not satisfy most of the interface or functional requirements in 1476 - which would require us to redesign this interface when we need to support larger than 2 GB for blocks in spark : unless I missed something. Pluggable block transfer (data plane communication) interface - Key: SPARK-3019 URL: https://issues.apache.org/jira/browse/SPARK-3019 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Attachments: PluggableBlockTransferServiceProposalforSpark - draft 1.pdf The attached design doc proposes a standard interface for block transferring, which will make future engineering of this functionality easier, allowing the Spark community to provide alternative implementations. Block transferring is a critical function in Spark. All of the following depend on it: * shuffle * torrent broadcast * block replication in BlockManager * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119365#comment-14119365 ] Mridul Muralidharan commented on SPARK-3019: I will try to push the version we had last worked on for 2G fix (a pre-1.1 fork) to git later today/this week - and we can take a look at it. It might require some effort to rebase it to 1.1 since it is slightly dated; but that can be done if required : the main reason for the push would be to illustrate the reason why the interfaces exist in SPARK-1476 and how they are used : so that there is a better understanding of the required functional change. Pluggable block transfer (data plane communication) interface - Key: SPARK-3019 URL: https://issues.apache.org/jira/browse/SPARK-3019 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Attachments: PluggableBlockTransferServiceProposalforSpark - draft 1.pdf The attached design doc proposes a standard interface for block transferring, which will make future engineering of this functionality easier, allowing the Spark community to provide alternative implementations. Block transferring is a critical function in Spark. All of the following depend on it: * shuffle * torrent broadcast * block replication in BlockManager * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks
[ https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119385#comment-14119385 ] Mridul Muralidharan commented on SPARK-1476: WIP version pushed to https://github.com/mridulm/spark/tree/2g_fix - about 2 weeks before feature freeze in 1.1 iirc. Note that the 2g fixes are functionally complete, but this branch also includes a large number of other fixes. Some of these have been pushed to master; while others have not yet done : for alleviating memory pressure primarily, and fixing resource leaks. This branch has been shared for reference purpose - and is not meant to be actively worked on for merging into master. We will need to cherry pick the changes and do that manually. 2GB limit in spark for blocks - Key: SPARK-1476 URL: https://issues.apache.org/jira/browse/SPARK-1476 Project: Spark Issue Type: Improvement Components: Spark Core Environment: all Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Priority: Critical Attachments: 2g_fix_proposal.pdf The underlying abstraction for blocks in spark is a ByteBuffer : which limits the size of the block to 2GB. This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2gig, even though the api allows for long), ser-deser via byte array backed outstreams (SPARK-1391), etc. This is a severe limitation for use of spark when used on non trivial datasets. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception
[ https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14113741#comment-14113741 ] Mridul Muralidharan commented on SPARK-3277: This looks like unrelated changes pushed to BlockObjectWriter as part of introduction of ShuffleWriteMetrics. I had introducing checks and also documented that we must not infer size based on position of stream after flush - since close can write data to the streams (and one flush can result in more data getting generated which need not be flushed to streams). Apparently this logic was modified subsequently causing this bug. Solution would be to revert changes to update shuffleBytesWritten before close of stream. It must be done after close and based on file.length LZ4 compression cause the the ExternalSort exception Key: SPARK-3277 URL: https://issues.apache.org/jira/browse/SPARK-3277 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2 Reporter: hzw Fix For: 1.1.0 I tested the LZ4 compression,and it come up with such problem.(with wordcount) Also I tested the snappy and LZF,and they were OK. At last I set the spark.shuffle.spill as false to avoid such exeception, but once open this switch, this error would come. Exeception Info as follow: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception
[ https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated SPARK-3277: --- Priority: Blocker (was: Major) LZ4 compression cause the the ExternalSort exception Key: SPARK-3277 URL: https://issues.apache.org/jira/browse/SPARK-3277 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2, 1.1.0, 1.2.0 Reporter: hzw Priority: Blocker Fix For: 1.1.0 I tested the LZ4 compression,and it come up with such problem.(with wordcount) Also I tested the snappy and LZF,and they were OK. At last I set the spark.shuffle.spill as false to avoid such exeception, but once open this switch, this error would come. It seems that if num of the words is few, wordcount will go through,but if it is a complex text ,this problem will show Exeception Info as follow: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception
[ https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated SPARK-3277: --- Affects Version/s: 1.2.0 1.1.0 LZ4 compression cause the the ExternalSort exception Key: SPARK-3277 URL: https://issues.apache.org/jira/browse/SPARK-3277 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2, 1.1.0, 1.2.0 Reporter: hzw Priority: Blocker Fix For: 1.1.0 I tested the LZ4 compression,and it come up with such problem.(with wordcount) Also I tested the snappy and LZF,and they were OK. At last I set the spark.shuffle.spill as false to avoid such exeception, but once open this switch, this error would come. It seems that if num of the words is few, wordcount will go through,but if it is a complex text ,this problem will show Exeception Info as follow: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception
[ https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114014#comment-14114014 ] Mridul Muralidharan commented on SPARK-3277: [~matei] Attaching a patch which reproduces the bug consistently. I suspect the issue is more serious than what I detailed above - spill to disk seems completely broken if I understood the assertion message correctly. Unfortunately, this is based on a few minutes of free time I could grab - so a more principled debugging session is definitely warranted ! LZ4 compression cause the the ExternalSort exception Key: SPARK-3277 URL: https://issues.apache.org/jira/browse/SPARK-3277 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2, 1.1.0, 1.2.0 Reporter: hzw Priority: Blocker Fix For: 1.1.0 I tested the LZ4 compression,and it come up with such problem.(with wordcount) Also I tested the snappy and LZF,and they were OK. At last I set the spark.shuffle.spill as false to avoid such exeception, but once open this switch, this error would come. It seems that if num of the words is few, wordcount will go through,but if it is a complex text ,this problem will show Exeception Info as follow: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3277) LZ4 compression cause the the ExternalSort exception
[ https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114022#comment-14114022 ] Mridul Muralidharan edited comment on SPARK-3277 at 8/28/14 5:37 PM: - Attached patch is against master, though I noticed similar changes in 1.1 also : but not yet verified. was (Author: mridulm80): Against master, though I noticed similar changes in 1.1 also : but not yet verified. LZ4 compression cause the the ExternalSort exception Key: SPARK-3277 URL: https://issues.apache.org/jira/browse/SPARK-3277 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2, 1.1.0, 1.2.0 Reporter: hzw Priority: Blocker Fix For: 1.1.0 Attachments: test_lz4_bug.patch I tested the LZ4 compression,and it come up with such problem.(with wordcount) Also I tested the snappy and LZF,and they were OK. At last I set the spark.shuffle.spill as false to avoid such exeception, but once open this switch, this error would come. It seems that if num of the words is few, wordcount will go through,but if it is a complex text ,this problem will show Exeception Info as follow: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3277) LZ4 compression cause the the ExternalSort exception
[ https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated SPARK-3277: --- Attachment: test_lz4_bug.patch Against master, though I noticed similar changes in 1.1 also : but not yet verified. LZ4 compression cause the the ExternalSort exception Key: SPARK-3277 URL: https://issues.apache.org/jira/browse/SPARK-3277 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2, 1.1.0, 1.2.0 Reporter: hzw Priority: Blocker Fix For: 1.1.0 Attachments: test_lz4_bug.patch I tested the LZ4 compression,and it come up with such problem.(with wordcount) Also I tested the snappy and LZF,and they were OK. At last I set the spark.shuffle.spill as false to avoid such exeception, but once open this switch, this error would come. It seems that if num of the words is few, wordcount will go through,but if it is a complex text ,this problem will show Exeception Info as follow: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception
[ https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114026#comment-14114026 ] Mridul Muralidharan commented on SPARK-3277: [~hzw] did you notice this against 1.0.2 ? I did not think the changes for consolidated shuffle were backported to that branch, [~mateiz] can comment more though. LZ4 compression cause the the ExternalSort exception Key: SPARK-3277 URL: https://issues.apache.org/jira/browse/SPARK-3277 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2, 1.1.0, 1.2.0 Reporter: hzw Priority: Blocker Fix For: 1.1.0 Attachments: test_lz4_bug.patch I tested the LZ4 compression,and it come up with such problem.(with wordcount) Also I tested the snappy and LZF,and they were OK. At last I set the spark.shuffle.spill as false to avoid such exeception, but once open this switch, this error would come. It seems that if num of the words is few, wordcount will go through,but if it is a complex text ,this problem will show Exeception Info as follow: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3277) LZ4 compression cause the the ExternalSort exception
[ https://issues.apache.org/jira/browse/SPARK-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114484#comment-14114484 ] Mridul Muralidharan commented on SPARK-3277: Sounds great, thx ! I suspect it is because for lzo we configure it to write block on flush (partial if insufficient data to fill block); but for lz4, either such config does not exist or we dont use that. Resulting in flush becoming noop in case the data in current block is insufficientto cause a compressed block to be created - while close will force patial block to be written out. Which is why the asserion lists all sizes as 0 LZ4 compression cause the the ExternalSort exception Key: SPARK-3277 URL: https://issues.apache.org/jira/browse/SPARK-3277 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.2, 1.1.0, 1.2.0 Reporter: hzw Assignee: Andrew Or Priority: Blocker Attachments: test_lz4_bug.patch I tested the LZ4 compression,and it come up with such problem.(with wordcount) Also I tested the snappy and LZF,and they were OK. At last I set the spark.shuffle.spill as false to avoid such exeception, but once open this switch, this error would come. It seems that if num of the[ words is few, wordcount will go through,but if it is a complex text ,this problem will show Exeception Info as follow: {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.init(ExternalAppendOnlyMap.scala:416) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:235) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3175) Branch-1.1 SBT build failed for Yarn-Alpha
[ https://issues.apache.org/jira/browse/SPARK-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14107923#comment-14107923 ] Mridul Muralidharan commented on SPARK-3175: Please add more information on why they need to be out of sync. As of now, the only way to build for yarn-alpha is to manually update pom.xml Branch-1.1 SBT build failed for Yarn-Alpha -- Key: SPARK-3175 URL: https://issues.apache.org/jira/browse/SPARK-3175 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.1.1 Reporter: Chester Labels: build Fix For: 1.1.1 Original Estimate: 1h Remaining Estimate: 1h When trying to build yarn-alpha on branch-1.1 ᚛ |branch-1.1|$ sbt/sbt -Pyarn-alpha -Dhadoop.version=2.0.5-alpha projects [info] Loading project definition from /Users/chester/projects/spark/project org.apache.maven.model.building.ModelBuildingException: 1 problem was encountered while building the effective model for org.apache.spark:spark-yarn-alpha_2.10:1.1.0 [FATAL] Non-resolvable parent POM: Could not find artifact org.apache.spark:yarn-parent_2.10:pom:1.1.0 in central ( http://repo.maven.apache.org/maven2) and 'parent.relativePath' points at wrong local POM @ line 20, column 11 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3115) Improve task broadcast latency for small tasks
[ https://issues.apache.org/jira/browse/SPARK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102142#comment-14102142 ] Mridul Muralidharan commented on SPARK-3115: I had a tab open with pretty much exact same bug comments ready to be filed :-) Improve task broadcast latency for small tasks -- Key: SPARK-3115 URL: https://issues.apache.org/jira/browse/SPARK-3115 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Shivaram Venkataraman Assignee: Reynold Xin Broadcasting the task information helps reduce the amount of data transferred for large tasks. However we've seen that this adds more latency for small tasks. It'll be great to profile and fix this. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099909#comment-14099909 ] Mridul Muralidharan commented on SPARK-3019: I am yet to go through the proposal in detail so will defer comments on that for later; but to get some clarity on discussion around Sandy's point : - Until we read from all mappers, shuffle cant actually start. Even if a single mapper's output is small enough to fit into memory (which it need not); num_mappers * avg_size_of_map_output_per_reducer could be way larger than available memory by orders. (This is fairly common for us for example). This was the reason we actually worked on 2G fix btw - individual blocks in a mapper and also the data per reducer for a mapper was larger than 2G :-) - While reading data off network, we cannot make an assessment if the read data can fit into memory or not (since there are other parallel read requests pending for this and other cores in the same executor). So spooling intermediate data to disk would become necessary at both mapper side (which it already does) and at reducer side (which we dont do currently - assume that a block can fit into reducer memory as part of doing a remote fetch). This becomes more relevant when we want to target bigger blocks of data and tackle skew in data (for shuffle) Pluggable block transfer (data plane communication) interface - Key: SPARK-3019 URL: https://issues.apache.org/jira/browse/SPARK-3019 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Attachments: PluggableBlockTransferServiceProposalforSpark - draft 1.pdf The attached design doc proposes a standard interface for block transferring, which will make future engineering of this functionality easier, allowing the Spark community to provide alternative implementations. Block transferring is a critical function in Spark. All of the following depend on it: * shuffle * torrent broadcast * block replication in BlockManager * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3019) Pluggable block transfer (data plane communication) interface
[ https://issues.apache.org/jira/browse/SPARK-3019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099910#comment-14099910 ] Mridul Muralidharan commented on SPARK-3019: Btw, can we do something about block replication when replication factor 1 ? Currently we silently loose replicas; and the block placement strategy for replication is fairly non existant. Pluggable block transfer (data plane communication) interface - Key: SPARK-3019 URL: https://issues.apache.org/jira/browse/SPARK-3019 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Attachments: PluggableBlockTransferServiceProposalforSpark - draft 1.pdf The attached design doc proposes a standard interface for block transferring, which will make future engineering of this functionality easier, allowing the Spark community to provide alternative implementations. Block transferring is a critical function in Spark. All of the following depend on it: * shuffle * torrent broadcast * block replication in BlockManager * remote block reads for tasks scheduled without locality -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks
[ https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099072#comment-14099072 ] Mridul Muralidharan commented on SPARK-1476: Based on discussions we had with others, apparently 1.1 was not a good vehicle for this proposal. Further, since there was no interest in this jira/comments on the proposal, we put the effort on the backburner. We plan to push atleast some of the bugs fixed as part of this effort - consolidated shuffle did get resolved in 1.1 and probably a few more might be contributed back in 1.2 time permitting (disk backed map output tracking for example looks like a good candidate). But bulk of the change is pervasive and at times a bit invasive and at odds with some of the other changes (for example, zero-copy); shepherding it might be a bit time consuming for me given other deliverables. If there is renewed interest in this to get it integrated into a spark release, I can try to push for it to be resurrected and submitted. 2GB limit in spark for blocks - Key: SPARK-1476 URL: https://issues.apache.org/jira/browse/SPARK-1476 Project: Spark Issue Type: Improvement Components: Spark Core Environment: all Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Priority: Critical Attachments: 2g_fix_proposal.pdf The underlying abstraction for blocks in spark is a ByteBuffer : which limits the size of the block to 2GB. This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2gig, even though the api allows for long), ser-deser via byte array backed outstreams (SPARK-1391), etc. This is a severe limitation for use of spark when used on non trivial datasets. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099115#comment-14099115 ] Mridul Muralidharan commented on SPARK-2089: For a general case, wont InputFormat's not have customizations to them for creation and/or initialization before they can be used to get splits ? (other than file names I mean). With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Priority: Critical When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14095247#comment-14095247 ] Mridul Muralidharan commented on SPARK-2089: Since I am not maintaining the code anymore, I dont have strong preference either way. I am not sure what the format means btw - I see multiple nodes and racks mentioned in the same group ... In general though, I am not convinced it is a good direction to take. 1) It is a workaround for a design issue and has non trivial performance implications (serializing into this form to immediately deserialize it is expensive for large inputs : not to mention, it gets shipped to executors for no reason). 2) It locks us into a format which provides inadequate information - number of blocks per node, size per block, etc is lost (or maybe I just did not understand what the format is !). 3) We are currently investigating evolving in the opposite direction - add more information so that we can be more specific about where to allocate executors. For example: I can see the fairly near term need to associate executors with accelerator cards (and break the OFF_HEAP - tachyon implicit assumption). A string representation makes it fragile to evolve. As I mentioned before, the current yarn allocation model in spark is a very naive implementation - which I did not expect to survive this long : it was directly from our prototype. We really should be modifying it to consider cost of data transfer and prioritize allocation that way (number of blocks on a node/rack, size of blocks, number of replicas available, etc). For small datasets on small enough clusters this is not relevant but has implications as we grow along both axis. With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Priority: Critical When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092807#comment-14092807 ] Mridul Muralidharan commented on SPARK-2962: On further investigation : a) The primary issue is a combination of SPARK-2089 and current schedule behavior for pendingTasksWithNoPrefs. SPARK-2089 leads to very bad allocation of nodes - particularly has an impact on bigger clusters. It leads to a lot of block having no data or rack local executors - causing them to end up in pendingTasksWithNoPrefs. While loading data off dfs, when an executor is being scheduled, even though there might be rack local schedules available for it (or, on waiting a while, data local too - see (b) below), because of current scheduler behavior, tasks from pendingTasksWithNoPrefs get scheduled : causing a large number of ANY tasks to be scheduled at the very onset. The combination of these, with lack of marginal alleviation via (b) is what caused the performance impact. b) spark.scheduler.minRegisteredExecutorsRatio was not yet been used in the workload - so that might alleviate some of the non deterministic waiting and ensuring adequate executors are allocated ! Thanks [~lirui] Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-2962) Suboptimal scheduling in spark
Mridul Muralidharan created SPARK-2962: -- Summary: Suboptimal scheduling in spark Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092427#comment-14092427 ] Mridul Muralidharan commented on SPARK-2962: To give more context; a) Our jobs start with load data from dfs as starting point : and so this is the first stage that gets executed. b) We are sleeping for 1 minute before starting the jobs (in case cluster is busy, etc) - unfortunately, this is not sufficient and iirc there is no programmatic way to wait more deterministically for X% of node (was something added to alleviate this ? I did see some discussion) c) This becomes more of a problem because spark does not honour preferred location anymore while running in yarn. See SPARK-208 - due to 1.0 interface changes. [ Practically, if we are using large enough number of nodes (with replication of 3 or higher), usually we do end up with quite of lot of data local tasks eventually - so (c) is not an immediate concern for our current jobs assuming (b) is not an issue, though it is suboptimal in general case ] Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092430#comment-14092430 ] Mridul Muralidharan commented on SPARK-2962: Hi [~matei], I am referencing the latest code (as of yday night). pendingTasksWithNoPrefs currnetly contains both tasks which truely have no preference, and tasks which have preference which are unavailble - and the latter is what is triggering this, since that can change during the execution of the stage. Hope I am not missing something ? Thanks, Mridul Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092431#comment-14092431 ] Mridul Muralidharan commented on SPARK-2962: Note, I dont think this is a regression in 1.1, and probably existed much earlier too. Other issues are making us notice this (like SPARK-2089) - we moved to 1.1 from 0.9 recently. Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092427#comment-14092427 ] Mridul Muralidharan edited comment on SPARK-2962 at 8/11/14 4:35 AM: - To give more context; a) Our jobs start with load data from dfs as starting point : and so this is the first stage that gets executed. b) We are sleeping for 1 minute before starting the jobs (in case cluster is busy, etc) - unfortunately, this is not sufficient and iirc there is no programmatic way to wait more deterministically for X% of node (was something added to alleviate this ? I did see some discussion) c) This becomes more of a problem because spark does not honour preferred location anymore while running in yarn. See SPARK-2089 - due to 1.0 interface changes. [ Practically, if we are using large enough number of nodes (with replication of 3 or higher), usually we do end up with quite of lot of data local tasks eventually - so (c) is not an immediate concern for our current jobs assuming (b) is not an issue, though it is suboptimal in general case ] was (Author: mridulm80): To give more context; a) Our jobs start with load data from dfs as starting point : and so this is the first stage that gets executed. b) We are sleeping for 1 minute before starting the jobs (in case cluster is busy, etc) - unfortunately, this is not sufficient and iirc there is no programmatic way to wait more deterministically for X% of node (was something added to alleviate this ? I did see some discussion) c) This becomes more of a problem because spark does not honour preferred location anymore while running in yarn. See SPARK-208 - due to 1.0 interface changes. [ Practically, if we are using large enough number of nodes (with replication of 3 or higher), usually we do end up with quite of lot of data local tasks eventually - so (c) is not an immediate concern for our current jobs assuming (b) is not an issue, though it is suboptimal in general case ] Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14091746#comment-14091746 ] Mridul Muralidharan commented on SPARK-2931: [~kayousterhout] this is weird, I remember mentioned this exact same issue in some PR for 1.1 (trying to find which one, though not 1313 iirc); and I think it was supposed to have been addressed. We had observed this issue of currentLocalityLevel running away when we had internally merged the pr. Strange that it was not addressed, speaks volumes of me not following up on my reviews ! getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException --- Key: SPARK-2931 URL: https://issues.apache.org/jira/browse/SPARK-2931 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf benchmark Reporter: Josh Rosen Priority: Blocker Fix For: 1.1.0 When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, I get the following errors (one per task): {code} 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 bytes) 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036] with ID 0 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1 java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} This causes the job to hang. I can deterministically reproduce this by re-running the test, either in isolation or as part of the full performance testing suite. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated SPARK-2931: --- Attachment: test.patch A patch to showcase the exception getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException --- Key: SPARK-2931 URL: https://issues.apache.org/jira/browse/SPARK-2931 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf benchmark Reporter: Josh Rosen Priority: Blocker Fix For: 1.1.0 Attachments: scala-sort-by-key.err, test.patch When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, I get the following errors (one per task): {code} 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 bytes) 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036] with ID 0 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1 java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} This causes the job to hang. I can deterministically reproduce this by re-running the test, either in isolation or as part of the full performance testing suite. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14091881#comment-14091881 ] Mridul Muralidharan commented on SPARK-2931: [~joshrosen] [~kayousterhout] Added a patch which deterministically showcases the bug - should be easy to fix it now I hope :-) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException --- Key: SPARK-2931 URL: https://issues.apache.org/jira/browse/SPARK-2931 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf benchmark Reporter: Josh Rosen Priority: Blocker Fix For: 1.1.0 Attachments: scala-sort-by-key.err, test.patch When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, I get the following errors (one per task): {code} 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 bytes) 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036] with ID 0 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1 java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} This causes the job to hang. I can deterministically reproduce this by re-running the test, either in isolation or as part of the full performance testing suite. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp
[ https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14088018#comment-14088018 ] Mridul Muralidharan commented on SPARK-2881: To add, this will affect spark whenever tmp directory is not overridden via java.io.tmpdir to something ephemeral. So local, yarn client, standalone should be affected by default (unless I missed something in the scripts). I am not very of how mesos runs jobs, so cant comment about that, anyone care to add ? A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a randomly generated directory under java.io.tmpdir as part of spark startup (only) once : which will cause snappy to use that directory and avoid this issue. Since snappy is the default codec now, I am marking this as a blocker for release Snappy is now default codec - could lead to conflicts since uses /tmp - Key: SPARK-2881 URL: https://issues.apache.org/jira/browse/SPARK-2881 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Thomas Graves Priority: Blocker I was using spark master branch and I ran into an issue with Snappy since its now the default codec for shuffle. The issue was that someone else had run with snappy and it created /tmp/snappy-*.so but it had restrictive permissions so I was not able to use it or remove it. This caused my spark job to not start. I was running in yarn client mode at the time. Yarn cluster mode shouldn't have this issue since we change the java.io.tmpdir. I assume this would also affect standalone mode. I'm not sure if this is a true blocker but wanted to file it as one at first and let us decide. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp
[ https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14088018#comment-14088018 ] Mridul Muralidharan edited comment on SPARK-2881 at 8/6/14 6:45 PM: To add, this will affect spark whenever tmp directory is not overridden via java.io.tmpdir to something ephemeral. So local, yarn client, standalone should be affected by default (unless I missed something in the scripts). I am not very of how mesos runs jobs, so cant comment about that, anyone care to add ? A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a randomly generated directory under java.io.tmpdir as part of spark startup (only) once : which will cause snappy to use that directory and avoid this issue. Since snappy is the default codec now, I am +1 on marking this as a blocker for release was (Author: mridulm80): To add, this will affect spark whenever tmp directory is not overridden via java.io.tmpdir to something ephemeral. So local, yarn client, standalone should be affected by default (unless I missed something in the scripts). I am not very of how mesos runs jobs, so cant comment about that, anyone care to add ? A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a randomly generated directory under java.io.tmpdir as part of spark startup (only) once : which will cause snappy to use that directory and avoid this issue. Since snappy is the default codec now, I am marking this as a blocker for release Snappy is now default codec - could lead to conflicts since uses /tmp - Key: SPARK-2881 URL: https://issues.apache.org/jira/browse/SPARK-2881 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Thomas Graves Priority: Blocker I was using spark master branch and I ran into an issue with Snappy since its now the default codec for shuffle. The issue was that someone else had run with snappy and it created /tmp/snappy-*.so but it had restrictive permissions so I was not able to use it or remove it. This caused my spark job to not start. I was running in yarn client mode at the time. Yarn cluster mode shouldn't have this issue since we change the java.io.tmpdir. I assume this would also affect standalone mode. I'm not sure if this is a true blocker but wanted to file it as one at first and let us decide. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2685) Update ExternalAppendOnlyMap to avoid buffer.remove()
[ https://issues.apache.org/jira/browse/SPARK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074186#comment-14074186 ] Mridul Muralidharan commented on SPARK-2685: We moved to using java.util.LinkedList for this Update ExternalAppendOnlyMap to avoid buffer.remove() - Key: SPARK-2685 URL: https://issues.apache.org/jira/browse/SPARK-2685 Project: Spark Issue Type: Sub-task Components: Spark Core Reporter: Matei Zaharia This shifts the whole right side of the array back, which can be expensive. It would be better to just swap the last element into the position we want to remove at, then decrease the size of the array. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2532) Fix issues with consolidated shuffle
Mridul Muralidharan created SPARK-2532: -- Summary: Fix issues with consolidated shuffle Key: SPARK-2532 URL: https://issues.apache.org/jira/browse/SPARK-2532 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Priority: Critical Fix For: 1.1.0 Will file PR with changes as soon as merge is done (earlier merge became outdated in 2 weeks unfortunately :) ). Consolidated shuffle is broken in multiple ways in spark : a) Task failure(s) can cause the state to become inconsistent. b) Multiple revert's or combination of close/revert/close can cause the state to be inconsistent. (As part of exception/error handling). c) Some of the api in block writer causes implementation issues - for example: a revert is always followed by close : but the implemention tries to keep them separate, resulting in surface for errors. d) Fetching data from consolidated shuffle files can go badly wrong if the file is being actively written to : it computes length by subtracting next offset from current offset (or length if this is last offset)- the latter fails when fetch is happening in parallel to write. Note, this happens even if there are no task failures of any kind ! This usually results in stream corruption or decompression errors. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication
[ https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060543#comment-14060543 ] Mridul Muralidharan commented on SPARK-2468: We map the file content and directly write that to the socket (except when the size is below 8k or so iirc) - are you sure we are copying to user space and back ? zero-copy shuffle network communication --- Key: SPARK-2468 URL: https://issues.apache.org/jira/browse/SPARK-2468 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Priority: Critical Right now shuffle send goes through the block manager. This is inefficient because it requires loading a block from disk into a kernel buffer, then into a user space buffer, and then back to a kernel send buffer before it reaches the NIC. It does multiple copies of the data and context switching between kernel/user. It also creates unnecessary buffer in the JVM that increases GC Instead, we should use FileChannel.transferTo, which handles this in the kernel space with zero-copy. See http://www.ibm.com/developerworks/library/j-zerocopy/ One potential solution is to use Netty NIO. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication
[ https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060545#comment-14060545 ] Mridul Muralidharan commented on SPARK-2468: Writing mmap'ed buffers are pretty efficient btw - the second fallback in transferTo implementation iirc. zero-copy shuffle network communication --- Key: SPARK-2468 URL: https://issues.apache.org/jira/browse/SPARK-2468 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Priority: Critical Right now shuffle send goes through the block manager. This is inefficient because it requires loading a block from disk into a kernel buffer, then into a user space buffer, and then back to a kernel send buffer before it reaches the NIC. It does multiple copies of the data and context switching between kernel/user. It also creates unnecessary buffer in the JVM that increases GC Instead, we should use FileChannel.transferTo, which handles this in the kernel space with zero-copy. See http://www.ibm.com/developerworks/library/j-zerocopy/ One potential solution is to use Netty NIO. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication
[ https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061094#comment-14061094 ] Mridul Muralidharan commented on SPARK-2468: Ah, small files - those are indeed a problem. Btw, we do dispose off map'ed blocks as soon as it is done; so we dont need to wait for gc to free them. Also note that the files are closed as soon as opened and mmap'ed - so they do not count towards open file count/ulimit. Agree on 1, 3 and 4 - some of these apply to sendfile too btw : so not avoidable; but it is the best we have right now. Since we use mmap'ed buffers and rarely transfer the same file again, the performance jump might not be the order(s) of magnitude other projects claim - but then even 10% (or whatever) improvement in our case would be substantial ! zero-copy shuffle network communication --- Key: SPARK-2468 URL: https://issues.apache.org/jira/browse/SPARK-2468 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Priority: Critical Right now shuffle send goes through the block manager. This is inefficient because it requires loading a block from disk into a kernel buffer, then into a user space buffer, and then back to a kernel send buffer before it reaches the NIC. It does multiple copies of the data and context switching between kernel/user. It also creates unnecessary buffer in the JVM that increases GC Instead, we should use FileChannel.transferTo, which handles this in the kernel space with zero-copy. See http://www.ibm.com/developerworks/library/j-zerocopy/ One potential solution is to use Netty NIO. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2398) Trouble running Spark 1.0 on Yarn
[ https://issues.apache.org/jira/browse/SPARK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060113#comment-14060113 ] Mridul Muralidharan commented on SPARK-2398: As discussed in the PR, I am attempting to list the various factors which contribute to overhead. Note, this is not exhaustive (yet) - please add more to this JIRA - so that when we are reasonably sure, we can model the expected overhead based on these factors. These factors are typically off-heap - since anything within heap is budgetted for by Xmx - and enforced by VM : and so should ideally (not practically always, see gc overheads) not exceed the Xmx value 1) 256 KB per socket accepted via ConnectionManager for inter-worker comm (setReceiveBufferSize) Typically, there will be (numExecutor - 1) number of sockets open. 2) 128 KB per socket for writing output to dfs. For reads, this does not seem to be configured - and should be 8k per socket iirc. Typically 1 per executor at a given point in time ? 3) 256k for each akka socket for send/receive buffer. One per worker ? (to talk to master) - so 512kb ? Any other use of akka ? 4) If I am not wrong, netty might allocate multiple spark.akka.frameSize sized direct buffer. There might be a few of these allocated and pooled/reused. I did not go in detail into netty code though. If someone else with more knowhow can clarify, that would be great ! Default size of 10mb for spark.akka.frameSize 5) The default size of the assembled spark jar is about 12x mb (and changing) - though not all classes get loaded, the overhead would be some function of this. The actual footprint would be higher than the on-disk size. IIRC this is outside of the heap - [~sowen], any comments on this ? I have not looked into these in like 10 years now ! 6) Per thread (Xss) overhead of 1mb (for 64bit vm). Last I recall, we have about 220 odd threads - not sure if this was at the master or on the workers. Ofcourse, this is dependent on the various threadpools we use (io, computation, etc), akka and netty config, etc. 7) Disk read overhead. Thanks for [~pwendell]'s fix, atleast for small files, the overhead is not too high - since we do not mmap files but directly read them. But for anything larger than 8kb (default), we use memory mapped buffers. The actual overhead depends on the number of files opened for read via DiskStore - and the entire file contents get mmap'ed into virt mem. Note that there is some non-virt-mem overhead also at native level for these buffers. The actual number of files opened should be carefully tracked to understand the effect of this on spark overhead : since this aspect is changing a lot off late. Impact is on shuffle, disk persisted rdd, among others. The actual value would be application dependent (how large the data is !) 8) The overhead introduced by VM not being able to reclaim memory completely (the cost of moving data vs amount of space reclaimed). Ideally, this should be low - but would be dependent on the heap space, collector used, among other things. I am not very knowledgable of the recent advances in gc collectors, so I hesitate to put a number to this. I am sure this is not an exhaustive list, please do add to this. In our case specifically, and [~tgraves] could add more, the number of containers can be high (300+ is easily possible), memory per container is modest (8gig usually). To add details of observed overhead patterns (from the PR discussion) - a) I have had inhouse GBDT impl run without customizing overhead (so default of 384 mb) with 12gb container and 22 nodes on reasonably large dataset. b) I have had to customize overhead to 1.7gb for collaborative filtering with 8gb container and 300 nodes (on a fairly large dataset). c) I have had to minimally customize overhead to do inhouse QR factorization of a 50k x 50k distributed dense matrix on 45 nodes at 12 gb each (this was incorrectly specified in the PR discussion). Trouble running Spark 1.0 on Yarn -- Key: SPARK-2398 URL: https://issues.apache.org/jira/browse/SPARK-2398 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Nishkam Ravi Trouble running workloads in Spark-on-YARN cluster mode for Spark 1.0. For example: SparkPageRank when run in standalone mode goes through without any errors (tested for up to 30GB input dataset on a 6-node cluster). Also runs fine for a 1GB dataset in yarn cluster mode. Starts to choke (in yarn cluster mode) as the input data size is increased. Confirmed for 16GB input dataset. The same workload runs fine with Spark 0.9 in both standalone and yarn cluster mode (for up to 30 GB input dataset on a 6-node cluster). Commandline used: (/opt/cloudera/parcels/CDH/lib/spark/bin/spark-submit
[jira] [Commented] (SPARK-2390) Files in staging directory cannot be deleted and wastes the space of HDFS
[ https://issues.apache.org/jira/browse/SPARK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14054162#comment-14054162 ] Mridul Muralidharan commented on SPARK-2390: Here, and a bunch of other places, spark currently closes the Filesystem instance : this is incorrect, and should not be done. The fix would be to remove the fs.close; not force creation of new instances. Files in staging directory cannot be deleted and wastes the space of HDFS - Key: SPARK-2390 URL: https://issues.apache.org/jira/browse/SPARK-2390 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0, 1.0.1 Reporter: Kousuke Saruta When running jobs with YARN Cluster mode and using HistoryServer, the files in the Staging Directory cannot be deleted. HistoryServer uses directory where event log is written, and the directory is represented as a instance of o.a.h.f.FileSystem created by using FileSystem.get. {code:title=FileLogger.scala} private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) {code} {code:title=utils.getHadoopFileSystem} def getHadoopFileSystem(path: URI): FileSystem = { FileSystem.get(path, SparkHadoopUtil.get.newConfiguration()) } {code} On the other hand, ApplicationMaster has a instance named fs, which also created by using FileSystem.get. {code:title=ApplicationMaster} private val fs = FileSystem.get(yarnConf) {code} FileSystem.get returns cached same instance when URI passed to the method represents same file system and the method is called by same user. Because of the behavior, when the directory for event log is on HDFS, fs of ApplicationMaster and fileSystem of FileLogger is same instance. When shutting down ApplicationMaster, fileSystem.close is called in FileLogger#stop, which is invoked by SparkContext#stop indirectly. {code:title=FileLogger.stop} def stop() { hadoopDataStream.foreach(_.close()) writer.foreach(_.close()) fileSystem.close() } {code} And ApplicationMaster#cleanupStagingDir also called by JVM shutdown hook. In this method, fs.delete(stagingDirPath) is invoked. Because fs.delete in ApplicationMaster is called after fileSystem.close in FileLogger, fs.delete fails and results not deleting files in the staging directory. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052275#comment-14052275 ] Mridul Muralidharan commented on SPARK-2277: Hmm, good point - that PR does change the scheduler expectations in a lot of ways which were not all anticipated. Let me go through the current PR; thanks for the bug ! Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large
[ https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052289#comment-14052289 ] Mridul Muralidharan commented on SPARK-2017: With aggregated metrics, we loose the ability to check for gc time (which is actually what I use that UI for, other than to dig up exceptions on failed tasks). web ui stage page becomes unresponsive when the number of tasks is large Key: SPARK-2017 URL: https://issues.apache.org/jira/browse/SPARK-2017 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin Labels: starter {code} sc.parallelize(1 to 100, 100).count() {code} The above code creates one million tasks to be executed. The stage detail web ui page takes forever to load (if it ever completes). There are again a few different alternatives: 0. Limit the number of tasks we show. 1. Pagination 2. By default only show the aggregate metrics and failed tasks, and hide the successful ones. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large
[ https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052679#comment-14052679 ] Mridul Muralidharan commented on SPARK-2017: Sounds great, ability to get to currently running tasks (to check current state), ability to get to task failures (to debug usually), some aggregate stats (gc, stats per executor, etc) and having some way to get to the full details (which is what is seen currently) in an advanced or full view. Anything else would be a bonus :-) web ui stage page becomes unresponsive when the number of tasks is large Key: SPARK-2017 URL: https://issues.apache.org/jira/browse/SPARK-2017 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin Labels: starter {code} sc.parallelize(1 to 100, 100).count() {code} The above code creates one million tasks to be executed. The stage detail web ui page takes forever to load (if it ever completes). There are again a few different alternatives: 0. Limit the number of tasks we show. 1. Pagination 2. By default only show the aggregate metrics and failed tasks, and hide the successful ones. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler
Mridul Muralidharan created SPARK-2353: -- Summary: ArrayIndexOutOfBoundsException in scheduler Key: SPARK-2353 URL: https://issues.apache.org/jira/browse/SPARK-2353 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Mridul Muralidharan Priority: Blocker I suspect the recent changes from SPARK-1937 to compute valid locality levels (and ignoring ones which are not applicable) has resulted in this issue. Specifically, some of the code using currentLocalityIndex (and lastLaunchTime actually) seems to be assuming a) constant population of locality levels. b) probably also immutablility/repeatibility of locality levels These do not hold any longer. I do not have the exact values for which this failure was observed (since this is from the logs of a failed job) - but the code path is highly suspect. Also note that the line numbers/classes might not exactly match master since we are in the middle of a merge. But the issue should hopefully be evident. java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051575#comment-14051575 ] Mridul Muralidharan commented on SPARK-2277: I have not rechecked that the code, but the way it was originally written by me was : a) Task preference is decoupled from availability of the node. For example, we need not have an executor on a host for which a block has host preference (example dfs blocks on a shared cluster) Also note that a block might have one or more preferred location. b) We lookup the rack for the preferred location to get preferred rack. As with (a), there need not be an executor on that rack. This is just the rack preference. c) At schedule time, for an executor, we lookup the host/rack of the executors location - and decide appropriately based on that. In this context, I think your requirement is already handled. Even if we dont have any hosts alive on a rack, those tasks would still be mentioned with rack local preference in task set manager. When an executor comes in (existing or new), we check that executors rack with task preference - and it would now be marked rack local. Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler
[ https://issues.apache.org/jira/browse/SPARK-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated SPARK-2353: --- Description: I suspect the recent changes from SPARK-1937 to compute valid locality levels (and ignoring ones which are not applicable) has resulted in this issue. Specifically, some of the code using currentLocalityIndex (and lastLaunchTime actually) seems to be assuming a) constant population of locality levels. b) probably also immutablility/repeatibility of locality levels These do not hold any longer. I do not have the exact values for which this failure was observed (since this is from the logs of a failed job) - but the code path is suspect. Also note that the line numbers/classes might not exactly match master since we are in the middle of a merge. But the issue should hopefully be evident. java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Unfortunately, we do not have the bandwidth to tackle this issue - would be great if someone could take a look at it ! Thanks. was: I suspect the recent changes from SPARK-1937 to compute valid locality levels (and ignoring ones which are not applicable) has resulted in this issue. Specifically, some of the code using currentLocalityIndex (and lastLaunchTime actually) seems to be assuming a) constant population of locality levels. b) probably also immutablility/repeatibility of locality levels These do not hold any longer. I do not have the exact values for which this failure was observed (since this is from the logs of a failed job) - but the code path is highly suspect. Also note that the line numbers/classes might not exactly match master since we are in the middle of a merge. But the issue should hopefully be evident. java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241) at
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14050886#comment-14050886 ] Mridul Muralidharan commented on SPARK-2277: I am not sure I follow this requirement. For preferred locations, we populate their corresponding racks (if available) as preferred rack. For available executors hosts, we lookup the rack they belong to - and then see if that rack is preferred or not. This, ofcourse, assumes a host is only on a single rack. What exactly is the behavior you are expecting from scheduler ? Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2294) TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks get assigned to an executor
[ https://issues.apache.org/jira/browse/SPARK-2294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045433#comment-14045433 ] Mridul Muralidharan commented on SPARK-2294: I agree; We should bump no locality pref and speculative tasks to NODE_LOCAL level after NODE_LOCAL tasks have been scheduled (if available), and not check for them at PROCESS_LOCAL max locality. So they get scheduled before RACK_LOCAL but after NODE_LOCAL. This is an artifact of the design when there was no PROCESS_LOCAL and NODE_LOCAL was the best schedule possible (without explicitly having these level : we had node and any). TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks get assigned to an executor --- Key: SPARK-2294 URL: https://issues.apache.org/jira/browse/SPARK-2294 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0, 1.0.1 Reporter: Kay Ousterhout If an executor E is free, a task may be speculatively assigned to E when there are other tasks in the job that have not been launched (at all) yet. Similarly, a task without any locality preferences may be assigned to E when there was another NODE_LOCAL task that could have been scheduled. This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer (which in turn calls TaskSetManager.findTask) with increasing locality levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the highest currently allowed level. Now, supposed NODE_LOCAL is the highest currently allowed locality level. The first time findTask is called, it will be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, it will try to schedule tasks with no locality preferences or speculative tasks. As a result, speculative tasks or tasks with no preferences may be scheduled instead of NODE_LOCAL tasks. cc [~matei] -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown
[ https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14043088#comment-14043088 ] Mridul Muralidharan commented on SPARK-2268: That is not because of this hook. There are a bunch of places in spark where filesystem objects are (incorrectly I should add) getting closed : some within shutdown hooks (check in stop method in various services in spark) and others elsewhere (like checkpointing code). I have fixed a bunch of these as part of some other work ... should come in a PR soon. Utils.createTempDir() creates race with HDFS at shutdown Key: SPARK-2268 URL: https://issues.apache.org/jira/browse/SPARK-2268 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Marcelo Vanzin Utils.createTempDir() has this code: {code} // Add a shutdown hook to delete the temp dir when the JVM exits Runtime.getRuntime.addShutdownHook(new Thread(delete Spark temp dir + dir) { override def run() { // Attempt to delete if some patch which is parent of this is not already registered. if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir) } }) {code} This creates a race with the shutdown hooks registered by HDFS, since the order of execution is undefined; if the HDFS hooks run first, you'll get exceptions about the file system being closed. Instead, this should use Hadoop's ShutdownHookManager with a proper priority, so that it runs before the HDFS hooks. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown
[ https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14043071#comment-14043071 ] Mridul Muralidharan commented on SPARK-2268: Setting priority for shutdown hooks does not have too much impact given the state of the VM. Note that this hook is trying to delete local directories - not dfs directories. Utils.createTempDir() creates race with HDFS at shutdown Key: SPARK-2268 URL: https://issues.apache.org/jira/browse/SPARK-2268 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Marcelo Vanzin Utils.createTempDir() has this code: {code} // Add a shutdown hook to delete the temp dir when the JVM exits Runtime.getRuntime.addShutdownHook(new Thread(delete Spark temp dir + dir) { override def run() { // Attempt to delete if some patch which is parent of this is not already registered. if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir) } }) {code} This creates a race with the shutdown hooks registered by HDFS, since the order of execution is undefined; if the HDFS hooks run first, you'll get exceptions about the file system being closed. Instead, this should use Hadoop's ShutdownHookManager with a proper priority, so that it runs before the HDFS hooks. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-704) ConnectionManager sometimes cannot detect loss of sending connections
[ https://issues.apache.org/jira/browse/SPARK-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039742#comment-14039742 ] Mridul Muralidharan commented on SPARK-704: --- If remote node goes down, SendingConnection would be notified since it is also registered for read events (to handle precisely this case actually). ReceivingConnection would anyway be notified since it is waiting on reads on that socket. This, ofcourse, assumes that local node detects remote node failure at tcp layer. Problems come in when ConnectionManager sometimes cannot detect loss of sending connections - Key: SPARK-704 URL: https://issues.apache.org/jira/browse/SPARK-704 Project: Spark Issue Type: Bug Reporter: Charles Reiss Assignee: Henry Saputra ConnectionManager currently does not detect when SendingConnections disconnect except if it is trying to send through them. As a result, a node failure just after a connection is initiated but before any acknowledgement messages can be sent may result in a hang. ConnectionManager has code intended to detect this case by detecting the failure of a corresponding ReceivingConnection, but this code assumes that the remote host:port of the ReceivingConnection is the same as the ConnectionManagerId, which is almost never true. Additionally, there does not appear to be any reason to assume a corresponding ReceivingConnection will exist. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (SPARK-704) ConnectionManager sometimes cannot detect loss of sending connections
[ https://issues.apache.org/jira/browse/SPARK-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039742#comment-14039742 ] Mridul Muralidharan edited comment on SPARK-704 at 6/21/14 9:10 AM: If remote node goes down, SendingConnection would be notified since it is also registered for read events (to handle precisely this case actually). ReceivingConnection would be notified since it is waiting on reads on that socket. This, ofcourse, assumes that local node detects remote node failure at tcp layer. Problems come in when this is not detected due to no activity on the socket (at app and socket level - keepalive timeout, etc). Usually this is detected via application level ping/keepalive messages : not sure if we want to introduce that into spark ... was (Author: mridulm80): If remote node goes down, SendingConnection would be notified since it is also registered for read events (to handle precisely this case actually). ReceivingConnection would anyway be notified since it is waiting on reads on that socket. This, ofcourse, assumes that local node detects remote node failure at tcp layer. Problems come in when ConnectionManager sometimes cannot detect loss of sending connections - Key: SPARK-704 URL: https://issues.apache.org/jira/browse/SPARK-704 Project: Spark Issue Type: Bug Reporter: Charles Reiss Assignee: Henry Saputra ConnectionManager currently does not detect when SendingConnections disconnect except if it is trying to send through them. As a result, a node failure just after a connection is initiated but before any acknowledgement messages can be sent may result in a hang. ConnectionManager has code intended to detect this case by detecting the failure of a corresponding ReceivingConnection, but this code assumes that the remote host:port of the ReceivingConnection is the same as the ConnectionManagerId, which is almost never true. Additionally, there does not appear to be any reason to assume a corresponding ReceivingConnection will exist. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2223) Building and running tests with maven is extremely slow
[ https://issues.apache.org/jira/browse/SPARK-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039217#comment-14039217 ] Mridul Muralidharan commented on SPARK-2223: [~tgraves] You could try running zinc - speeds up the maven build quite a bit. I find that I need to shutdown and restart it at times though .. but otherwise, works fine. Building and running tests with maven is extremely slow --- Key: SPARK-2223 URL: https://issues.apache.org/jira/browse/SPARK-2223 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.0.0 Reporter: Thomas Graves For some reason using maven with Spark is extremely slow. Building and running tests takes way longer then other projects I have used that use maven. We should investigate to see why. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039236#comment-14039236 ] Mridul Muralidharan commented on SPARK-2089: [~pwendell] SplitInfo is not from hadoop - but gives locality preference in context spark (see org.apache.spark.scheduler.SplitInfo) in a reasonably api agnostic way. The default support provided for it is hadoop specific based on dfs blocks - but I dont think there is anything stopping us from expressing other forms (either already currently or with minor modifications as applicable). We actually very heavily use that api - moving 10s or 100s of TB of data tends to be fairly expensive :-) Since we are still stuck in 0.9 + changes, have not yet faced this issue though, so great to see this being addressed. With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Priority: Critical When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1353) IllegalArgumentException when writing to disk
[ https://issues.apache.org/jira/browse/SPARK-1353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14033625#comment-14033625 ] Mridul Muralidharan commented on SPARK-1353: This is due to limitation in spark which is being addressed in https://issues.apache.org/jira/browse/SPARK-1476. IllegalArgumentException when writing to disk - Key: SPARK-1353 URL: https://issues.apache.org/jira/browse/SPARK-1353 Project: Spark Issue Type: Bug Components: Block Manager Environment: AWS EMR 3.2.30-49.59.amzn1.x86_64 #1 SMP x86_64 GNU/Linux Spark 1.0.0-SNAPSHOT built for Hadoop 1.0.4 built 2014-03-18 Reporter: Jim Blomo Priority: Minor The Executor may fail when trying to mmap a file bigger than Integer.MAX_VALUE due to the constraints of FileChannel.map (http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#map(java.nio.channels.FileChannel.MapMode, long, long)). The signature takes longs, but the size value must be less than MAX_VALUE. This manifests with the following backtrace: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:337) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:281) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:430) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:38) at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2018) Big-Endian (IBM Power7) Spark Serialization issue
[ https://issues.apache.org/jira/browse/SPARK-2018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027808#comment-14027808 ] Mridul Muralidharan commented on SPARK-2018: Ah ! This is an interesting bug. Default spark uses java serialization ... so should not be an issue : but yet you are facing it ! (I am assuming you have not customized serialization). Is it possible for you to dump data written and read at both ends ? The env vars and jvm details ? Actually, spark does not do anything fancy for default serialization : so a simple example code without spark in picture could also be tried (write to file on master node, and read from the file in slave node - and see if it works) Big-Endian (IBM Power7) Spark Serialization issue -- Key: SPARK-2018 URL: https://issues.apache.org/jira/browse/SPARK-2018 Project: Spark Issue Type: Bug Affects Versions: 1.0.0 Environment: hardware : IBM Power7 OS:Linux version 2.6.32-358.el6.ppc64 (mockbu...@ppc-017.build.eng.bos.redhat.com) (gcc version 4.4.7 20120313 (Red Hat 4.4.7-3) (GCC) ) #1 SMP Tue Jan 29 11:43:27 EST 2013 JDK: Java(TM) SE Runtime Environment (build pxp6470sr5-20130619_01(SR5)) IBM J9 VM (build 2.6, JRE 1.7.0 Linux ppc64-64 Compressed References 20130617_152572 (JIT enabled, AOT enabled) Hadoop:Hadoop-0.2.3-CDH5.0 Spark:Spark-1.0.0 or Spark-0.9.1 spark-env.sh: export JAVA_HOME=/opt/ibm/java-ppc64-70/ export SPARK_MASTER_IP=9.114.34.69 export SPARK_WORKER_MEMORY=1m export SPARK_CLASSPATH=/home/test1/spark-1.0.0-bin-hadoop2/lib export STANDALONE_SPARK_MASTER_HOST=9.114.34.69 #export SPARK_JAVA_OPTS=' -Xdebug -Xrunjdwp:transport=dt_socket,address=9,server=y,suspend=n ' Reporter: Yanjie Gao We have an application run on Spark on Power7 System . But we meet an important issue about serialization. The example HdfsWordCount can meet the problem. ./bin/run-example org.apache.spark.examples.streaming.HdfsWordCount localdir We used Power7 (Big-Endian arch) and Redhat 6.4. Big-Endian is the main cause since the example ran successfully in another Power-based Little Endian setup. here is the exception stack and log: Spark Executor Command: /opt/ibm/java-ppc64-70//bin/java -cp /home/test1/spark-1.0.0-bin-hadoop2/lib::/home/test1/src/spark-1.0.0-bin-hadoop2/conf:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/ -XX:MaxPermSize=128m -Xdebug -Xrunjdwp:transport=dt_socket,address=9,server=y,suspend=n -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler 2 p7hvs7br16 4 akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker app-20140604023054- 14/06/04 02:31:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/04 02:31:21 INFO spark.SecurityManager: Changing view acls to: test1,yifeng 14/06/04 02:31:21 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(test1, yifeng) 14/06/04 02:31:22 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/06/04 02:31:22 INFO Remoting: Starting remoting 14/06/04 02:31:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@p7hvs7br16:39658] 14/06/04 02:31:22 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@p7hvs7br16:39658] 14/06/04 02:31:22 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler 14/06/04 02:31:22 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker 14/06/04 02:31:23 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker 14/06/04 02:31:24 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver 14/06/04 02:31:24 INFO spark.SecurityManager: Changing view acls to: test1,yifeng 14/06/04 02:31:24 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(test1, yifeng) 14/06/04 02:31:24 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/06/04 02:31:24 INFO Remoting: Starting remoting 14/06/04 02:31:24 INFO Remoting: Remoting started; listening on addresses
[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14026397#comment-14026397 ] Mridul Muralidharan commented on SPARK-2089: preferredNodeLocationData used to be passed as a constructor parameter - and so always available. The rearrangement of SparkContext initialization has introduced this bug. With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead
[ https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020789#comment-14020789 ] Mridul Muralidharan commented on SPARK-2064: Depending on how long a job runs, this can cause OOM on the master. In yarn (and mesos ?) an executor on the same node gets different port if relaunched on failure - and so end up as different executor in the list. web ui should not remove executors if they are dead --- Key: SPARK-2064 URL: https://issues.apache.org/jira/browse/SPARK-2064 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin We should always show the list of executors that have ever been connected, and add a status column to mark them as dead if they have been disconnected. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead
[ https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020936#comment-14020936 ] Mridul Muralidharan commented on SPARK-2064: It is 100 MB (or more) of memory which could be used elsewhere. In our clusters, for example, the number of workers can be very high while the containers can be quite ephemeral when under load (and so lot of container losses); on other hand, memory per container is constrained to about 8 gig (lower when we account for overheads, etc). So the amount of working memory in master reduces : we are finding that UI and related codepath is one of the portions which seems to be occupying a lot of memory in the OOM dumps of master. web ui should not remove executors if they are dead --- Key: SPARK-2064 URL: https://issues.apache.org/jira/browse/SPARK-2064 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin We should always show the list of executors that have ever been connected, and add a status column to mark them as dead if they have been disconnected. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead
[ https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14021008#comment-14021008 ] Mridul Muralidharan commented on SPARK-2064: Unfortunately OOM is a very big issue for us since application master is single point of failure when running in yarn. Particularly when memory is constrained and vigorously enforced by the yarn containers (requiring higher overheads to be specified reducing usable memory even further. Given this, and given the fair churn already for executor containers, I am hesitant about features which add to the memory footprint for UI even further. The cumulative impact of ui is nontrivial as I mentioned before. This, for example, would require 1-8% of master memory when there is reasonable churn for long running jobs (30 hours) on reasonable number of executors (200-300). web ui should not remove executors if they are dead --- Key: SPARK-2064 URL: https://issues.apache.org/jira/browse/SPARK-2064 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin We should always show the list of executors that have ever been connected, and add a status column to mark them as dead if they have been disconnected. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead
[ https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14021011#comment-14021011 ] Mridul Muralidharan commented on SPARK-2064: I am probably missing the intent behind this change. What is the expected use case it is supposed to help with ? web ui should not remove executors if they are dead --- Key: SPARK-2064 URL: https://issues.apache.org/jira/browse/SPARK-2064 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin We should always show the list of executors that have ever been connected, and add a status column to mark them as dead if they have been disconnected. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large
[ https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14019394#comment-14019394 ] Mridul Muralidharan commented on SPARK-2017: Currently, for our jobs, I run with spark.ui.retainedStages=3 (so that there is some visibility into past stages) : this is to prevent OOM's in the master when number of tasks per stage is not low (50k for example is not very high imo) The stage details UI becomes very sluggish to pretty much unresponsive for our tasks where tasks 30k ... though that might also be a browser issue (firefox/chrome) ? web ui stage page becomes unresponsive when the number of tasks is large Key: SPARK-2017 URL: https://issues.apache.org/jira/browse/SPARK-2017 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin Labels: starter {code} sc.parallelize(1 to 100, 100).count() {code} The above code creates one million tasks to be executed. The stage detail web ui page takes forever to load (if it ever completes). There are again a few different alternatives: 0. Limit the number of tasks we show. 1. Pagination 2. By default only show the aggregate metrics and failed tasks, and hide the successful ones. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1956) Enable shuffle consolidation by default
[ https://issues.apache.org/jira/browse/SPARK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011741#comment-14011741 ] Mridul Muralidharan commented on SPARK-1956: shuffle consolidation MUST NOT be enabled - whether by default, or intentionally. In 1.0, it is very badly broken - we have a whole litany of fixes for it, before it was reasonably stable. Current plan is to contribute most of these back in 1.1 timeframe. Enable shuffle consolidation by default --- Key: SPARK-1956 URL: https://issues.apache.org/jira/browse/SPARK-1956 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza The only drawbacks are on ext3, and most everyone has ext4 at this point. I think it's better to aim the default at the common case. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1855) Provide memory-and-local-disk RDD checkpointing
[ https://issues.apache.org/jira/browse/SPARK-1855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14001377#comment-14001377 ] Mridul Muralidharan commented on SPARK-1855: Did not realize that mail replies to JIRA mails did not get mirrored to JIRA ! Replicating my mail here : – cut and paste – We don't have 3x replication in spark :-) And if we use replicated storagelevel, while decreasing odds of failure, it does not eliminate it (since we are not doing a great job with replication anyway from fault tolerance point of view). Also it does take a nontrivial performance hit with replicated levels. Regards, Mridul Provide memory-and-local-disk RDD checkpointing --- Key: SPARK-1855 URL: https://issues.apache.org/jira/browse/SPARK-1855 Project: Spark Issue Type: New Feature Components: MLlib, Spark Core Affects Versions: 1.0.0 Reporter: Xiangrui Meng Checkpointing is used to cut long lineage while maintaining fault tolerance. The current implementation is HDFS-based. Using the BlockRDD we can create in-memory-and-local-disk (with replication) checkpoints that are not as reliable as HDFS-based solution but faster. It can help applications that require many iterations. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1849) Broken UTF-8 encoded data gets character replacements and thus can't be fixed
[ https://issues.apache.org/jira/browse/SPARK-1849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14000397#comment-14000397 ] Mridul Muralidharan commented on SPARK-1849: Looks like textFile is probably the wrong api to use. You cannot recover from badly encoded data ... Better would be to write your own InputFormat which does what you need. Broken UTF-8 encoded data gets character replacements and thus can't be fixed --- Key: SPARK-1849 URL: https://issues.apache.org/jira/browse/SPARK-1849 Project: Spark Issue Type: Bug Reporter: Harry Brundage Fix For: 1.0.0, 0.9.1 Attachments: encoding_test I'm trying to process a file which isn't valid UTF-8 data inside hadoop using Spark via {{sc.textFile()}}. Is this possible, and if not, is this a bug that we should fix? It looks like {{HadoopRDD}} uses {{org.apache.hadoop.io.Text.toString}} on all the data it ever reads, which I believe replaces invalid UTF-8 byte sequences with the UTF-8 replacement character, \uFFFD. Some example code mimicking what {{sc.textFile}} does underneath: {code} scala sc.textFile(path).collect()(0) res8: String = ?pple scala sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).map(pair = pair._2.toString).collect()(0).getBytes() res9: Array[Byte] = Array(-17, -65, -67, 112, 112, 108, 101) scala sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).map(pair = pair._2.getBytes).collect()(0) res10: Array[Byte] = Array(-60, 112, 112, 108, 101) {code} In the above example, the first two snippets show the string representation and byte representation of the example line of text. The third snippet shows what happens if you call {{getBytes}} on the {{Text}} object which comes back from hadoop land: we get the real bytes in the file out. Now, I think this is a bug, though you may disagree. The text inside my file is perfectly valid iso-8859-1 encoded bytes, which I would like to be able to rescue and re-encode into UTF-8, because I want my application to be smart like that. I think Spark should give me the raw broken string so I can re-encode, but I can't get at the original bytes in order to guess at what the source encoding might be, as they have already been replaced. I'm dealing with data from some CDN access logs which are to put it nicely diversely encoded, but I think a use case Spark should fully support. So, my suggested fix, which I'd like some guidance, is to change {{textFile}} to spit out broken strings by not using {{Text}}'s UTF-8 encoding. Further compounding this issue is that my application is actually in PySpark, but we can talk about how bytes fly through to Scala land after this if we agree that this is an issue at all. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1813) Add a utility to SparkConf that makes using Kryo really easy
[ https://issues.apache.org/jira/browse/SPARK-1813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13996390#comment-13996390 ] Mridul Muralidharan commented on SPARK-1813: Writing a KryoRegistrator is the only requirement - rest are done as part of initialization anyway. Registering classes with kryo is non trivial except for degenerate cases : for example, we have classes we have to use java read/write Object serialization, which support kyro serialization, which support java's external serialization, generated classes, etc. And we would need a registrator ... ofcourse, it could be argued this is corner case, though I dont think so. Add a utility to SparkConf that makes using Kryo really easy Key: SPARK-1813 URL: https://issues.apache.org/jira/browse/SPARK-1813 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza It would be nice to have a method in SparkConf that makes it really easy to use Kryo and register a set of classes. without defining you Using Kryo currently requires all this: {code} import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[MyClass1]) kryo.register(classOf[MyClass2]) } } val conf = new SparkConf().setMaster(...).setAppName(...) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, mypackage.MyRegistrator) val sc = new SparkContext(conf) {code} It would be nice if it just required this: {code} SparkConf.setKryo(Array(classOf[MyFirstClass, classOf[MySecond])) {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1606) spark-submit needs `--arg` for every application parameter
[ https://issues.apache.org/jira/browse/SPARK-1606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13988756#comment-13988756 ] Mridul Muralidharan commented on SPARK-1606: Crap, got to this too late. We really should not have added this - and I would have -1'ed it. We have had too many issues with trying to parse the user command line and getting into all sorts of issues which are entirely avoidable by simply being explicit about what the user wants to pass. Trying to pass strings which contain escape characters and/or whitespace, etc is just going to be a nightmare about this change. spark-submit needs `--arg` for every application parameter -- Key: SPARK-1606 URL: https://issues.apache.org/jira/browse/SPARK-1606 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Xiangrui Meng Assignee: Patrick Wendell Priority: Blocker Fix For: 1.0.0 If the application has a few parameters, the spark-submit command looks like the following: {code} spark-submit --master yarn-cluster --class main.Class --arg --numPartitions --arg 8 --arg --kryo --arg true {code} It is a little bit hard to read and modify. Maybe it is okay to treat all arguments after `main.Class` as application parameters. {code} spark-submit --master yarn-cluster --class main.Class --numPartitions 8 --kryo true {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1706) Allow multiple executors per worker in Standalone mode
[ https://issues.apache.org/jira/browse/SPARK-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13988868#comment-13988868 ] Mridul Muralidharan commented on SPARK-1706: Oh my, this was supposed to be logical addition once yarn changes were done. Yarn changes were very heavily modelled on standalone mode (hence why yarn-standalone !) : and it was supposed to be a two way street : changes made for yarn support (multi-tennancy, etc) was supposed to have been added back to standalone mode when yarn support stabilized. Did not realize I never got around to it - my apologies ! Allow multiple executors per worker in Standalone mode -- Key: SPARK-1706 URL: https://issues.apache.org/jira/browse/SPARK-1706 Project: Spark Issue Type: Improvement Components: Deploy Reporter: Patrick Wendell Fix For: 1.1.0 Right now if people want to launch multiple executors on each machine they need to start multiple standalone workers. This is not too difficult, but it means you have extra JVM's sitting around. We should just allow users to set a number of cores they want per-executor in standalone mode and then allow packing multiple executors on each node. This would make standalone mode more consistent with YARN in the way you request resources. It's not too big of a change as far as I can see. You'd need to: 1. Introduce a configuration for how many cores you want per executor. 2. Change the scheduling logic in Master.scala to take this into account. 3. Change CoarseGrainedSchedulerBackend to not assume a 1-1 correspondence between hosts and executors. And maybe modify a few other places. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1576) Passing of JAVA_OPTS to YARN on command line
[ https://issues.apache.org/jira/browse/SPARK-1576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13981313#comment-13981313 ] Mridul Muralidharan commented on SPARK-1576: There is a misunderstanding here - it is to pass SPARK_JAVA_OPTS : not JAVA_OPTS. Directly passing JAVA_OPTS has beem removed Passing of JAVA_OPTS to YARN on command line Key: SPARK-1576 URL: https://issues.apache.org/jira/browse/SPARK-1576 Project: Spark Issue Type: Improvement Affects Versions: 0.9.0, 1.0.0, 0.9.1 Reporter: Nishkam Ravi Fix For: 0.9.0, 1.0.0, 0.9.1 Attachments: SPARK-1576.patch JAVA_OPTS can be passed by using either env variables (i.e., SPARK_JAVA_OPTS) or as config vars (after Patrick's recent change). It would be good to allow the user to pass them on command line as well to restrict scope to single application invocation. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1586) Fix issues with spark development under windows
[ https://issues.apache.org/jira/browse/SPARK-1586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13981321#comment-13981321 ] Mridul Muralidharan commented on SPARK-1586: Immediate issues fixed though there are more hive tests failing due to path related issues. pr : https://github.com/apache/spark/pull/505 Fix issues with spark development under windows --- Key: SPARK-1586 URL: https://issues.apache.org/jira/browse/SPARK-1586 Project: Spark Issue Type: Bug Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 1.0.0 -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (SPARK-1587) Fix thread leak in spark
[ https://issues.apache.org/jira/browse/SPARK-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan resolved SPARK-1587. Resolution: Fixed Fixed, https://github.com/apache/spark/pull/504 Fix thread leak in spark Key: SPARK-1587 URL: https://issues.apache.org/jira/browse/SPARK-1587 Project: Spark Issue Type: Bug Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan SparkContext.stop does not cause all threads to exit. When running tests via scalatest (which keeps reusing the same vm), over time, this causes too many threads to be created causing tests to fail due to inability to create more threads. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (BOOKKEEPER-560) Create readme for hedwig-client-jms
[ https://issues.apache.org/jira/browse/BOOKKEEPER-560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated BOOKKEEPER-560: --- Assignee: (was: Mridul Muralidharan) Create readme for hedwig-client-jms Key: BOOKKEEPER-560 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-560 Project: Bookkeeper Issue Type: Task Components: Documentation Reporter: Ivan Kelly This module needs a readme describing it as an experimental component and what parts of jms are supported and not supported. It would also be good to have a bit more detail on why they can't be supported with hedwig as it is today. A lot of this can be taken verbatim from the package-info.html -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (BOOKKEEPER-648) BasicJMSTest failed
[ https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated BOOKKEEPER-648: --- Assignee: (was: Mridul Muralidharan) BasicJMSTest failed --- Key: BOOKKEEPER-648 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648 Project: Bookkeeper Issue Type: Bug Components: hedwig-client Reporter: Flavio Junqueira While running tests, I got once a failure for this hedwig-client-jms test: BasicJMSTest. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1588) SPARK_JAVA_OPTS is not getting propagated
[ https://issues.apache.org/jira/browse/SPARK-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13978827#comment-13978827 ] Mridul Muralidharan commented on SPARK-1588: Apparently, SPARK_YARN_USER_ENV is also broken SPARK_JAVA_OPTS is not getting propagated - Key: SPARK-1588 URL: https://issues.apache.org/jira/browse/SPARK-1588 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Mridul Muralidharan Priority: Blocker We could use SPARK_JAVA_OPTS to pass JAVA_OPTS to be used in the master. This is no longer working in current master. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1524) TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process
[ https://issues.apache.org/jira/browse/SPARK-1524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13972864#comment-13972864 ] Mridul Muralidharan commented on SPARK-1524: The expectation is to fallback to a previous schedule type in case the higher level is not valid : though this is tricky in general case. Will need to take a look at it - though given that I am tied up with other things, if someone else wants to take a crack, please feel free to do so ! Btw, use of IP's and multiple hostnames for a host is not supported in spark - so that is something that will need to be resolved at the deployment end. TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process Key: SPARK-1524 URL: https://issues.apache.org/jira/browse/SPARK-1524 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor ShuffleMapTask is constructed with TaskLocation which has only host not (host, executorID) pair in DAGScheduler. When TaskSetManager schedules ShuffleMapTask which has no preferred executorId using specific execId host and PROCESS_LOCAL locality level, no tasks match the given locality constraint in the first search process. We also find that the host used by Scheduler is hostname while the host used by TaskLocation is IP in our cluster. The tow hosts do not match, that makes pendingTasksForHost HashMap empty and the finding task process against our expectation. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks
[ https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13972978#comment-13972978 ] Mridul Muralidharan commented on SPARK-1476: [~matei] We are having some issue porting the netty shuffle copier code to support 2G since only ByteBuf seems to be exposed. Before I dig into netty more, wanted to know if you or someone else from among spark developers knew how to add support for large buffers in our netty code. Thanks ! 2GB limit in spark for blocks - Key: SPARK-1476 URL: https://issues.apache.org/jira/browse/SPARK-1476 Project: Spark Issue Type: Bug Components: Spark Core Environment: all Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Priority: Critical Fix For: 1.1.0 The underlying abstraction for blocks in spark is a ByteBuffer : which limits the size of the block to 2GB. This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2gig, even though the api allows for long), ser-deser via byte array backed outstreams (SPARK-1391), etc. This is a severe limitation for use of spark when used on non trivial datasets. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1453) Improve the way Spark on Yarn waits for executors before starting
[ https://issues.apache.org/jira/browse/SPARK-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13968390#comment-13968390 ] Mridul Muralidharan commented on SPARK-1453: (d) becomes relevant in case of headless/cron'ed jobs. If the job is user initiated, then I agree, the user would typically kill and restart the job. Improve the way Spark on Yarn waits for executors before starting - Key: SPARK-1453 URL: https://issues.apache.org/jira/browse/SPARK-1453 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.0.0 Reporter: Thomas Graves Assignee: Thomas Graves Currently Spark on Yarn just delays a few seconds between when the spark context is initialized and when it allows the job to start. If you are on a busy hadoop cluster is might take longer to get the number of executors. In the very least we could make this timeout a configurable value. Its currently hardcoded to 3 seconds. Better yet would be to allow user to give a minimum number of executors it wants to wait for, but that looks much more complex. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (SPARK-1476) 2GB limit in spark for blocks
[ https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967854#comment-13967854 ] Mridul Muralidharan edited comment on SPARK-1476 at 4/13/14 2:45 PM: - There are multiple issues at play here : a) If a block goes beyond 2G, everything fails - this is the case for shuffle, cached and 'normal' blocks. Irrespective of storage level and/or other config. In a lot of cases, particularly when data generated 'increases' after a map/flatMap/etc, the user has no control over the size increase in the output block for a given input block (think iterations over datasets). b) Increasing number of partitions is not always an option (for the subset of usecases where it can be done) : 1) It has an impact on number of intermediate files created while doing a shuffle. (ulimit constraints, IO performance issues, etc). 2) It does not help when there is skew anyway. c) Compression codec, serializer used, etc have an impact. d) 2G is extremely low limit to have in modern hardware : and this is particularly a severe limitation when we have nodes running on 32G to 64G ram and TB's of disk space available for spark. To address specific points raised above : A) [~pwendell] Mapreduce jobs dont fail in case the block size of files increases - it might be inefficient, it still runs (not that I know of any case where it does actually, but theoretically I guess it can become inefficient). So analogy does not apply. Also to add, 2G is not really an unlimited increase in block size - and in MR, output of a map can easily go a couple of orders above 2G : whether it is followed by a reduce or not. B) [~matei] In the specific cases it was failing, the users were not caching the data but directly going to shuffle. There was no skew from what I see : just the data size per key is high; and there are a lot of keys too btw (as iterations increase and nnz increases). Note that it was an impl detail that it was not being cached - it could have been too. Additionally, compression and/or serialization also apply implicitly in this case, since it was impacting shuffle - the 2G limit was observed at both the map and reduce side (in two different jobs). In general, our effort is to make spark as a drop in replacement for most usecases which are currently being done via MR/Pig/etc. Limitations of this sort make it difficult to position spark as a credible alternative. Current approach we are exploring is to remove all direct references to ByteBuffer from spark (except for ConnectionManager, etc parts); and rely on a BlockData or similar datastructure which encapsulate the data corresponding to a block. By default, a single ByteBuffer should suffice but in case it does not, the class will automatically take care of splitting across blocks. Similarly, all references to byte array backed streams will need to be replaced with a wrapper stream which multiplexes over byte array streams. The performance impact for all 'normal' usecases should be the minimal, while allowing for spark to be used in cases where 2G limit is being hit. The only unknown here is tachyon integration : where the interface is a ByteBuffer - and I am not knowledgable enough to comment on what the issues there would be. was (Author: mridulm80): There are multiple issues at play here : a) If a block goes beyond 2G, everything fails - this is the case for shuffle, cached and 'normal' blocks. Irrespective of storage level and/or other config. In a lot of cases, particularly when data generated 'increases' after a map/flatMap/etc, the user has no control over the size increase in the output block for a given input block (think iterations over datasets). b) Increasing number of partitions is not always an option (for the subset of usecases where it can be done) : 1) It has an impact on number of intermediate files created while doing a shuffle. (ulimit constraints, IO performance issues, etc). 2) It does not help when there is skew anyway. c) Compression codec, serializer used, etc have an impact. d) 2G is extremely low limit to have in modern hardware : and this is practically a severe limitation when we have nodes running on 32G to 64G ram and TB's of disk space available for spark. To address specific points raised above : A) [~pwendell] Mapreduce jobs dont fail in case the block size of files increases - it might be inefficient, it still runs (not that I know of any case where it does actually, but theoretically I guess it can). So analogy does not apply. To add, 2G is not really an unlimited increase in block size - and in MR, output of a map can easily go a couple of orders above 2G. B) [~matei] In the specific cases it was failing, the users were not caching the data but directly going to shuffle. There was no skew from what we see : just the data size per key is high;
[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks
[ https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967419#comment-13967419 ] Mridul Muralidharan commented on SPARK-1476: WIP Proposal: - All references to ByteBuffer will need to be replaced with Seq[ByteBuffer]. This applies to definition of a block, memory mapped file segments for a shuffle block, etc. - All use of byte array backed outputstream will need to be replaced with a aggregating outputstream which writes to multiple boas as and when array limits are hit. 2GB limit in spark for blocks - Key: SPARK-1476 URL: https://issues.apache.org/jira/browse/SPARK-1476 Project: Spark Issue Type: Bug Components: Spark Core Environment: all Reporter: Mridul Muralidharan Priority: Critical The underlying abstraction for blocks in spark is a ByteBuffer : which limits the size of the block to 2GB. This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2gig, even though the api allows for long), ser-deser via byte array backed outstreams (SPARK-1391), etc. This is a severe limitation for use of spark when used on non trivial datasets. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13966332#comment-13966332 ] Mridul Muralidharan commented on SPARK-1391: Another place where this is relevant is here : java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:413) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:339) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:506) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39) at org.apache.spark.rdd.RDD.iterator(RDD.scala:233) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:52) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1262) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) So we might want to change the abstraction from single ByteBuffer to a sequence of bytebuffers ... BlockManager cannot transfer blocks larger than 2G in size -- Key: SPARK-1391 URL: https://issues.apache.org/jira/browse/SPARK-1391 Project: Spark Issue Type: Bug Components: Block Manager, Shuffle Affects Versions: 1.0.0 Reporter: Shivaram Venkataraman Assignee: Min Zhou Attachments: SPARK-1391.diff If a task tries to remotely access a cached RDD block, I get an exception when the block size is 2G. The exception is pasted below. Memory capacities are huge these days ( 60G), and many workflows depend on having large blocks in memory, so it would be good to fix this bug. I don't know if the same thing happens on shuffles if one transfer (from mapper to reducer) is 2G. {noformat} 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer message java.lang.ArrayIndexOutOfBoundsException at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) at it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) at it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at
[jira] [Commented] (SPARK-542) Cache Miss when machine have multiple hostname
[ https://issues.apache.org/jira/browse/SPARK-542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967185#comment-13967185 ] Mridul Muralidharan commented on SPARK-542: --- Spark uses only hostnames - not ip's. Even for hostnames, it should ideally pick only the canonical hostname - not the others. This was done by design in 0.8 ... try to find if multiple host names/ip's are all referring to the same physical host/container is fraught with too many issues. Cache Miss when machine have multiple hostname -- Key: SPARK-542 URL: https://issues.apache.org/jira/browse/SPARK-542 Project: Spark Issue Type: Bug Reporter: frankvictor HI, I encountered a weird runtime of pagerank in last few day. After debugging the job, I found it was caused by the DNS name. The machines of my cluster have multiple hostname, for example, slave 1 have name (c001 and c001.cm.cluster) when spark adding cache in cacheTracker, it get c001 and add cache use it. But when schedule task in SimpleJob, the msos offer give spark c001.cm.cluster. so It will never get preferred location! I thinks spark should handle the multiple hostname case(by using ip instead of hostname, or some other methods). Thanks! -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1453) Improve the way Spark on Yarn waits for executors before starting
[ https://issues.apache.org/jira/browse/SPARK-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967193#comment-13967193 ] Mridul Muralidharan commented on SPARK-1453: The timeout gets hit only when we dont get requested executors, right ? So it is more like max timeout (controlled by number of times we loop iirc). The reason for keeping it stupid was simply because we have no gaurantees of number of containers which might be available to spark in a busy cluster : at times, it might not be practically possible to even get a fraction of the requested nodes (either due to busy cluster, or because of lack of resources - so infinite wait). Ideally, I should have exposed the number of containers allocated - so that atleast user code could use it as spi and decide how to proceed for more complex cases. Missed out on this one. I am not sure which usecases make sense. a) Wait for X seconds or requested containers allocated. b) Wait until minimum of Y containers allocated (out of X requested). c) (b) with (a) - that is min containers and timeout on that. d) (c) with exit if min containers not allocated ? (d) is something which I keep hitting into (if I dont get my required minimum nodes, and job proceeds, I usually end up bringing down those nodes :-( ) Improve the way Spark on Yarn waits for executors before starting - Key: SPARK-1453 URL: https://issues.apache.org/jira/browse/SPARK-1453 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.0.0 Reporter: Thomas Graves Assignee: Thomas Graves Currently Spark on Yarn just delays a few seconds between when the spark context is initialized and when it allows the job to start. If you are on a busy hadoop cluster is might take longer to get the number of executors. In the very least we could make this timeout a configurable value. Its currently hardcoded to 3 seconds. Better yet would be to allow user to give a minimum number of executors it wants to wait for, but that looks much more complex. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed
[ https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13719324#comment-13719324 ] Mridul Muralidharan commented on BOOKKEEPER-648: On going over the code again, the testcase is extremely simple - it tests the sync and async api. a) Create one publisher and two subscriber sessions for the topic (one via sync api, another async api of jms). b) Send 4 messages through publisher session. c) Sleep for 10 ms to ensure all messages have been sent - this should not be required, but hedwig sometimes takes a lot of time. d) For sync test, wait for 100 ms for each message to be received : null is returned if it times out without receiving any message. The number of times receive is called is equal to the number of times we sent messages - the test expects in-order-delivery without loss of messages as per hedwig api contract. This is what is failing : we are not receiving all the messages we sent. e) The async session listener does the same as (d) - except on the async listener : this did not get tested in this specific case due to validation failure in (d). Looking more, we should probably add a sleep before checking the 'messageCount.getValue() != CHAT_MESSAGES.length' condition - in case async listener is still running in parallel : though this is not the failure we are observing ... Assuming you noticed the same assertion stacktrace in each case when it failed, it means no message was received before timeout in sync invocation. This can be due to : 1) Hedwig or bookkeeper is inordinately slow for some reasons (slow hdd, filled up /tmp, low mem, tlb thrashing, etc ?) : in which case, simply bumping up the sleep time and receive timeout param will circumvent the issue. 2) There is some bug somewhere in the chain which is causing message drops - either at publish time or while sending it to subscribers or somewhere else ? To get additional debugging info, there are log messages in jms module : but (particularly for this testcase) the jms module is a thin wrapper delegating to corresponding hedwig client api - so enabling debug there would be more helpful. Actually, I would validate if the server actually sent messages to both the subscribers and they were received by the client - if yes, rest would be a client side bug (hedwig client or jms). If you could reproduce the issue with debug logging enabled for root logger, I can definitely help narrow down the issue with those logs ! Unfortunately, there are almost no testcases in hedwig client : so I am not sure what design or implementation changes happened in client (or server ?) - since I am not keeping track of bookkeeper/hedwig anymore. BasicJMSTest failed --- Key: BOOKKEEPER-648 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648 Project: Bookkeeper Issue Type: Bug Components: hedwig-client Reporter: Flavio Junqueira Assignee: Mridul Muralidharan While running tests, I got once a failure for this hedwig-client-jms test: BasicJMSTest. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed
[ https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13719348#comment-13719348 ] Mridul Muralidharan commented on BOOKKEEPER-648: Flavio, I am not sure what the expectation of this bug is - is it appropriate debug message for this testcase ? As I detailed above - the only message possible is : no message was received before timeout - unfortunately, I dont think that is going to help us much in debugging it. The reason I did not add descriptive message for every assertion in the tests is because the corresponding JMS api's detail the error conditions in detail (when a null can be returned from receive is detailed in JMS api javadocs for example). To actually debug/fix the issue, we will need to enable debug logging in server, client api and jms mode. Subsequently, when issue is observed, we will need to trace whether message was actually sent to server, whether server dispatched to both subscribers, whether client api received it, and whether it was dispatched to jms. The jms module does nothing different from any other api user of hedwig - barring bugs in it ofcourse :-) This is an assertion which is typically not expected to fail unless there is something broken elsewhere which is causing message loss. BasicJMSTest failed --- Key: BOOKKEEPER-648 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648 Project: Bookkeeper Issue Type: Bug Components: hedwig-client Reporter: Flavio Junqueira Assignee: Mridul Muralidharan While running tests, I got once a failure for this hedwig-client-jms test: BasicJMSTest. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed
[ https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13718267#comment-13718267 ] Mridul Muralidharan commented on BOOKKEEPER-648: I was not able to reproduce this with latest svn trunk. {noformat} $ mvn test [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building hedwig-client-jms 4.3.0-SNAPSHOT [INFO] [INFO] [INFO] --- javacc-maven-plugin:2.6:jjtree-javacc (jjtree-javacc) @ hedwig-client-jms --- [INFO] Skipping - all parsers are up to date [INFO] [INFO] --- maven-remote-resources-plugin:1.1:process (default) @ hedwig-client-jms --- [INFO] Setting property: classpath.resource.loader.class = 'org.codehaus.plexus.velocity.ContextClassLoaderResourceLoader'. [INFO] Setting property: velocimacro.messages.on = 'false'. [INFO] Setting property: resource.loader = 'classpath'. [INFO] Setting property: resource.manager.logwhenfound = 'false'. [INFO] [INFO] --- maven-resources-plugin:2.4.3:resources (default-resources) @ hedwig-client-jms --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 2 resources [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.0:compile (default-compile) @ hedwig-client-jms --- [INFO] Changes detected - recompiling the module! [INFO] Compiling 76 source files to /home/mridulm/work/bookkeeper/trunk/hedwig-client-jms/target/classes [INFO] [INFO] --- maven-resources-plugin:2.4.3:testResources (default-testResources) @ hedwig-client-jms --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/mridulm/work/bookkeeper/trunk/hedwig-client-jms/src/test/resources [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.0:testCompile (default-testCompile) @ hedwig-client-jms --- [INFO] Changes detected - recompiling the module! [INFO] Compiling 104 source files to /home/mridulm/work/bookkeeper/trunk/hedwig-client-jms/target/test-classes [INFO] [INFO] --- maven-surefire-plugin:2.9:test (default-test) @ hedwig-client-jms --- [INFO] Tests are skipped. [INFO] [INFO] --- maven-surefire-plugin:2.9:test (unit-tests) @ hedwig-client-jms --- [INFO] Surefire report directory: /home/mridulm/work/bookkeeper/trunk/hedwig-client-jms/target/surefire-reports --- T E S T S --- --- T E S T S --- Running org.apache.hedwig.jms.BasicJMSTest Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 9.516 sec Running org.apache.hedwig.jms.selector.activemq.SelectorTest Tests run: 19, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.208 sec Running org.apache.hedwig.jms.selector.activemq.SelectorParserTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.195 sec Running org.apache.hedwig.jms.selector.BasicSelectorGrammarTest Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.238 sec Results : Tests run: 26, Failures: 0, Errors: 0, Skipped: 0 {noformat} BasicJMSTest failed --- Key: BOOKKEEPER-648 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648 Project: Bookkeeper Issue Type: Bug Components: hedwig-client Reporter: Flavio Junqueira Assignee: Mridul Muralidharan While running tests, I got once a failure for this hedwig-client-jms test: BasicJMSTest. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed
[ https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13718365#comment-13718365 ] Mridul Muralidharan commented on BOOKKEEPER-648: I tried it 3 times - a) mvn test b) mvn install Both from the jms subdir and c) mvn install for entire bookkeeper. Was there any other env difference ? Like hdd space constraint, mem constraint, etc ? The basic test is actually pretty basic - send msg, and receive the same On Wed, Jul 24, 2013 at 6:51 PM, Flavio Junqueira (JIRA) BasicJMSTest failed --- Key: BOOKKEEPER-648 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648 Project: Bookkeeper Issue Type: Bug Components: hedwig-client Reporter: Flavio Junqueira Assignee: Mridul Muralidharan While running tests, I got once a failure for this hedwig-client-jms test: BasicJMSTest. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (BOOKKEEPER-648) BasicJMSTest failed
[ https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13718369#comment-13718369 ] Mridul Muralidharan commented on BOOKKEEPER-648: I tried it 3 times - a) mvn test b) mvn install Both from the jms subdir and c) mvn install for entire bookkeeper. Was there any other env difference when it failed ? Like hdd space constraint, mem constraint, etc ? The basic test is actually pretty basic - send msg, and receive the same if that fails, everything else should also fail. I did not notice any deprecation warnings, so wondering what has changed ! BasicJMSTest failed --- Key: BOOKKEEPER-648 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648 Project: Bookkeeper Issue Type: Bug Components: hedwig-client Reporter: Flavio Junqueira Assignee: Mridul Muralidharan While running tests, I got once a failure for this hedwig-client-jms test: BasicJMSTest. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (BOOKKEEPER-312) Implementation of JMS provider
[ https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13568313#comment-13568313 ] Mridul Muralidharan commented on BOOKKEEPER-312: Oh great ! I saw the failed message from hudson, and was not sure what the status was ... My point was that there is nothing much we can do about the findbugs warnings. Thanks for clarifying Flavio; and thanks to Ivan and Matthieu for the reviews and helpful comments ! Implementation of JMS provider -- Key: BOOKKEEPER-312 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312 Project: Bookkeeper Issue Type: Sub-task Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 4.3.0 Attachments: BOOKKEEPER-312.diff, hedwig-client-jms.patch.10 The JMS provider implementation conforming to the 1.1 spec. The limitations as of now are : 1) No support for Queue's : Hedwig currently does not have a notion of JMS queue's for us to leverage. 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of connection -(n)- session -(n)- publisher/subscriber. Each session has a hedwig connection. Currently I am simulating noLocal, but this IS fragile and works for the duration of connection - ONLY until the message id is still in a LRUCache. As mentioned before, this is a kludge, and not a good solution. 3) Note that everything is durable in hedwig - so we do not support NON_PERSISTENT delivery mode. 4) Calling unsubscribe on a durable subscription will fail if it was NOT created in the current session. In hedwig, to unsubscribe, we need the subscription id and the topic ... To simulate unsubscribe(), we store the subscriberId to topicName mapping when a create* api is invoked. Hence, if create* was NOT called, then we have no way to infer which topic the subscription-id refers to from hedwig, and so cant unsubscribe. The workaround is - simply create a durable subsriber just as a workaround of this limitation - the topicName will be known to the user/client anyway. 5) Explicit session recovery is not supported. Reconnection of hedwig session (either explicitly or implicitly by underlying client implementation) will automatically trigger redelivery of un-acknowledged messages. 6) Because of the above, setting the JMSRedelivered flag is almost impossible in a consistent way. Currently, we simulate it for redelivery due to provider side events : rollback of txn, exception in message listener (primarily). At best we can simulate it with a kludge - at risk of potentially running out of resources ... this is being investigated : but unlikely to have a clean fix. 7) Hedwig only supports marking all messages until seq-id as received : while JMS indicates ability to acknowledge individual messages. This distinction is currently unsupported. 8) JMS spec requires A connection's delivery of incoming messages can be temporarily stopped using its stop() method. It can be restarted using its start() method. When the connection is stopped, delivery to all the connection’s MessageConsumers is inhibited: synchronous receives block, and messages are not delivered to MessageListeners. We honor this for undelivered messages from server - but if stop is called while there are pending messages yet to be delivered to a listener (or buffered in subscriber for receive), then they will be delivered irrespective of stop(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (BOOKKEEPER-312) Implementation of JMS provider
[ https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13565194#comment-13565194 ] Mridul Muralidharan commented on BOOKKEEPER-312: A) About @author tags: The author tags are from activemq testcases. Currently we do very minimal changes to them in the hope that we can do future merges from their codebase (unfortunately, this might not be posible anymore, but still ...) $ find . -type f | xargs grep -i author | grep @ ./src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java: * @author tmielke ./src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java: * @author tmielke ./src/test/java/org/apache/activemq/usecases/TransactionRollbackOrderTest.java: * @author Paul Smith ./src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java: * @author Rajani Chennamaneni ./src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java: * @author jlyons ./src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java: * @author jlyons ./src/test/java/org/apache/activemq/usecases/TransactionTest.java: * @author pragmasoft ./src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java: * @author Paul Smith B) About tabs in code: Again, this is coming from activemq codebase : (ctrl+v, tab as grep param below) $ find . -type f | grep -v \.class | xargs grep '' | grep -v activemq Binary file ./target/hedwig-client-jms-4.3.0-SNAPSHOT.jar matches C) trailing spaces There are quite a lot of cases of trailing spaces - including in generated code. Is this mandatory to remove this ? It will be very timeconsuming to do so manually; and I dont have the time to try to script it. If there is something already present, I can ofcourse run it and resubmit patch. D) Files with lines longer than 120 characters. There are a bunch of files with lines longer than 120 characters : mostly in comments and testcases. I will try to remove them if it is a priority - unfortunately, I wont be able to get to these for a while due to other commitments. $ for i in `find src -type f `; do if [ -n `awk 'length($0) 120' $i` ]; then echo $i; fi; done | grep -v activemq | grep java$ src/test/java/org/apache/hedwig/jms/selector/BasicSelectorGrammarTest.java src/test/java/org/apache/hedwig/jms/BasicJMSTest.java src/test/java/org/apache/hedwig/JmsTestBase.java src/main/java/org/apache/hedwig/jms/selector/ValueComparisonFunction.java src/main/java/org/apache/hedwig/jms/SessionImpl.java src/main/java/org/apache/hedwig/jms/ConnectionImpl.java src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java src/main/java/org/apache/hedwig/jms/message/header/JmsHeader.java Implementation of JMS provider -- Key: BOOKKEEPER-312 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312 Project: Bookkeeper Issue Type: Sub-task Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 4.3.0 Attachments: hedwig-client-jms.patch, hedwig-client-jms.patch.1, hedwig-client-jms.patch.10, hedwig-client-jms.patch.10, hedwig-client-jms.patch.2, hedwig-client-jms.patch.3, hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, hedwig-client-jms.patch.9, hedwig-client-jms.patch.9 The JMS provider implementation conforming to the 1.1 spec. The limitations as of now are : 1) No support for Queue's : Hedwig currently does not have a notion of JMS queue's for us to leverage. 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of connection -(n)- session -(n)- publisher/subscriber. Each session has a hedwig connection. Currently I am simulating noLocal, but this IS fragile and works for the duration of connection - ONLY until the message id is still in a LRUCache. As mentioned before, this is a kludge, and not a good solution. 3) Note that everything is durable in hedwig - so we do not support NON_PERSISTENT delivery mode. 4) Calling unsubscribe on a durable subscription will fail if it was NOT created in the current session. In hedwig, to unsubscribe, we need the subscription id and the topic ... To simulate unsubscribe(), we store the subscriberId to topicName mapping when a create* api is invoked. Hence, if create* was NOT called, then we have no way to infer which topic the subscription-id refers to from hedwig, and so cant unsubscribe. The workaround is - simply create a durable subsriber just as a workaround of this limitation - the topicName will be known to the user/client anyway. 5) Explicit session recovery is not supported. Reconnection of hedwig
[jira] [Commented] (BOOKKEEPER-312) Implementation of JMS provider
[ https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13565644#comment-13565644 ] Mridul Muralidharan commented on BOOKKEEPER-312: Hi Ivan, a) The github repo I have is horribly out of date (and has other changes in it) : as of now, the only thing I have is the patch which finished the review. b) It is not a direct import of the activemq testcode : but it has gone through quite a bit of change to allow use with hedwig (most of it script generated). The only reason we import the activemq code is to allow us to test the corner cases in jms spec. Though it is not mandatory to functionality include it (worst case, we can drop it - just like we dropped JORAM testcases due to incompatible license) : but imo, considering the significant value they bring, we should include it if possible. c) Regarding licenses : that is weird, my silly scripts did not catch them ! I will add the license and re-submit : thanks for the pointer, that was a bad oversight; my apologies Implementation of JMS provider -- Key: BOOKKEEPER-312 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312 Project: Bookkeeper Issue Type: Sub-task Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 4.3.0 Attachments: hedwig-client-jms.patch, hedwig-client-jms.patch.1, hedwig-client-jms.patch.10, hedwig-client-jms.patch.10, hedwig-client-jms.patch.2, hedwig-client-jms.patch.3, hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, hedwig-client-jms.patch.9, hedwig-client-jms.patch.9 The JMS provider implementation conforming to the 1.1 spec. The limitations as of now are : 1) No support for Queue's : Hedwig currently does not have a notion of JMS queue's for us to leverage. 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of connection -(n)- session -(n)- publisher/subscriber. Each session has a hedwig connection. Currently I am simulating noLocal, but this IS fragile and works for the duration of connection - ONLY until the message id is still in a LRUCache. As mentioned before, this is a kludge, and not a good solution. 3) Note that everything is durable in hedwig - so we do not support NON_PERSISTENT delivery mode. 4) Calling unsubscribe on a durable subscription will fail if it was NOT created in the current session. In hedwig, to unsubscribe, we need the subscription id and the topic ... To simulate unsubscribe(), we store the subscriberId to topicName mapping when a create* api is invoked. Hence, if create* was NOT called, then we have no way to infer which topic the subscription-id refers to from hedwig, and so cant unsubscribe. The workaround is - simply create a durable subsriber just as a workaround of this limitation - the topicName will be known to the user/client anyway. 5) Explicit session recovery is not supported. Reconnection of hedwig session (either explicitly or implicitly by underlying client implementation) will automatically trigger redelivery of un-acknowledged messages. 6) Because of the above, setting the JMSRedelivered flag is almost impossible in a consistent way. Currently, we simulate it for redelivery due to provider side events : rollback of txn, exception in message listener (primarily). At best we can simulate it with a kludge - at risk of potentially running out of resources ... this is being investigated : but unlikely to have a clean fix. 7) Hedwig only supports marking all messages until seq-id as received : while JMS indicates ability to acknowledge individual messages. This distinction is currently unsupported. 8) JMS spec requires A connection's delivery of incoming messages can be temporarily stopped using its stop() method. It can be restarted using its start() method. When the connection is stopped, delivery to all the connection’s MessageConsumers is inhibited: synchronous receives block, and messages are not delivered to MessageListeners. We honor this for undelivered messages from server - but if stop is called while there are pending messages yet to be delivered to a listener (or buffered in subscriber for receive), then they will be delivered irrespective of stop(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (BOOKKEEPER-312) Implementation of JMS provider
[ https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated BOOKKEEPER-312: --- Attachment: (was: hedwig-client-jms.patch) Implementation of JMS provider -- Key: BOOKKEEPER-312 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312 Project: Bookkeeper Issue Type: Sub-task Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 4.3.0 Attachments: hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, hedwig-client-jms.patch.9, hedwig-client-jms.patch.9 The JMS provider implementation conforming to the 1.1 spec. The limitations as of now are : 1) No support for Queue's : Hedwig currently does not have a notion of JMS queue's for us to leverage. 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of connection -(n)- session -(n)- publisher/subscriber. Each session has a hedwig connection. Currently I am simulating noLocal, but this IS fragile and works for the duration of connection - ONLY until the message id is still in a LRUCache. As mentioned before, this is a kludge, and not a good solution. 3) Note that everything is durable in hedwig - so we do not support NON_PERSISTENT delivery mode. 4) Calling unsubscribe on a durable subscription will fail if it was NOT created in the current session. In hedwig, to unsubscribe, we need the subscription id and the topic ... To simulate unsubscribe(), we store the subscriberId to topicName mapping when a create* api is invoked. Hence, if create* was NOT called, then we have no way to infer which topic the subscription-id refers to from hedwig, and so cant unsubscribe. The workaround is - simply create a durable subsriber just as a workaround of this limitation - the topicName will be known to the user/client anyway. 5) Explicit session recovery is not supported. Reconnection of hedwig session (either explicitly or implicitly by underlying client implementation) will automatically trigger redelivery of un-acknowledged messages. 6) Because of the above, setting the JMSRedelivered flag is almost impossible in a consistent way. Currently, we simulate it for redelivery due to provider side events : rollback of txn, exception in message listener (primarily). At best we can simulate it with a kludge - at risk of potentially running out of resources ... this is being investigated : but unlikely to have a clean fix. 7) Hedwig only supports marking all messages until seq-id as received : while JMS indicates ability to acknowledge individual messages. This distinction is currently unsupported. 8) JMS spec requires A connection's delivery of incoming messages can be temporarily stopped using its stop() method. It can be restarted using its start() method. When the connection is stopped, delivery to all the connection’s MessageConsumers is inhibited: synchronous receives block, and messages are not delivered to MessageListeners. We honor this for undelivered messages from server - but if stop is called while there are pending messages yet to be delivered to a listener (or buffered in subscriber for receive), then they will be delivered irrespective of stop(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (BOOKKEEPER-312) Implementation of JMS provider
[ https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated BOOKKEEPER-312: --- Attachment: (was: hedwig-client-jms.patch.10) Implementation of JMS provider -- Key: BOOKKEEPER-312 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312 Project: Bookkeeper Issue Type: Sub-task Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 4.3.0 Attachments: hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, hedwig-client-jms.patch.9, hedwig-client-jms.patch.9 The JMS provider implementation conforming to the 1.1 spec. The limitations as of now are : 1) No support for Queue's : Hedwig currently does not have a notion of JMS queue's for us to leverage. 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of connection -(n)- session -(n)- publisher/subscriber. Each session has a hedwig connection. Currently I am simulating noLocal, but this IS fragile and works for the duration of connection - ONLY until the message id is still in a LRUCache. As mentioned before, this is a kludge, and not a good solution. 3) Note that everything is durable in hedwig - so we do not support NON_PERSISTENT delivery mode. 4) Calling unsubscribe on a durable subscription will fail if it was NOT created in the current session. In hedwig, to unsubscribe, we need the subscription id and the topic ... To simulate unsubscribe(), we store the subscriberId to topicName mapping when a create* api is invoked. Hence, if create* was NOT called, then we have no way to infer which topic the subscription-id refers to from hedwig, and so cant unsubscribe. The workaround is - simply create a durable subsriber just as a workaround of this limitation - the topicName will be known to the user/client anyway. 5) Explicit session recovery is not supported. Reconnection of hedwig session (either explicitly or implicitly by underlying client implementation) will automatically trigger redelivery of un-acknowledged messages. 6) Because of the above, setting the JMSRedelivered flag is almost impossible in a consistent way. Currently, we simulate it for redelivery due to provider side events : rollback of txn, exception in message listener (primarily). At best we can simulate it with a kludge - at risk of potentially running out of resources ... this is being investigated : but unlikely to have a clean fix. 7) Hedwig only supports marking all messages until seq-id as received : while JMS indicates ability to acknowledge individual messages. This distinction is currently unsupported. 8) JMS spec requires A connection's delivery of incoming messages can be temporarily stopped using its stop() method. It can be restarted using its start() method. When the connection is stopped, delivery to all the connection’s MessageConsumers is inhibited: synchronous receives block, and messages are not delivered to MessageListeners. We honor this for undelivered messages from server - but if stop is called while there are pending messages yet to be delivered to a listener (or buffered in subscriber for receive), then they will be delivered irrespective of stop(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (BOOKKEEPER-312) Implementation of JMS provider
[ https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated BOOKKEEPER-312: --- Attachment: (was: hedwig-client-jms.patch.10) Implementation of JMS provider -- Key: BOOKKEEPER-312 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312 Project: Bookkeeper Issue Type: Sub-task Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 4.3.0 Attachments: hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, hedwig-client-jms.patch.9, hedwig-client-jms.patch.9 The JMS provider implementation conforming to the 1.1 spec. The limitations as of now are : 1) No support for Queue's : Hedwig currently does not have a notion of JMS queue's for us to leverage. 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of connection -(n)- session -(n)- publisher/subscriber. Each session has a hedwig connection. Currently I am simulating noLocal, but this IS fragile and works for the duration of connection - ONLY until the message id is still in a LRUCache. As mentioned before, this is a kludge, and not a good solution. 3) Note that everything is durable in hedwig - so we do not support NON_PERSISTENT delivery mode. 4) Calling unsubscribe on a durable subscription will fail if it was NOT created in the current session. In hedwig, to unsubscribe, we need the subscription id and the topic ... To simulate unsubscribe(), we store the subscriberId to topicName mapping when a create* api is invoked. Hence, if create* was NOT called, then we have no way to infer which topic the subscription-id refers to from hedwig, and so cant unsubscribe. The workaround is - simply create a durable subsriber just as a workaround of this limitation - the topicName will be known to the user/client anyway. 5) Explicit session recovery is not supported. Reconnection of hedwig session (either explicitly or implicitly by underlying client implementation) will automatically trigger redelivery of un-acknowledged messages. 6) Because of the above, setting the JMSRedelivered flag is almost impossible in a consistent way. Currently, we simulate it for redelivery due to provider side events : rollback of txn, exception in message listener (primarily). At best we can simulate it with a kludge - at risk of potentially running out of resources ... this is being investigated : but unlikely to have a clean fix. 7) Hedwig only supports marking all messages until seq-id as received : while JMS indicates ability to acknowledge individual messages. This distinction is currently unsupported. 8) JMS spec requires A connection's delivery of incoming messages can be temporarily stopped using its stop() method. It can be restarted using its start() method. When the connection is stopped, delivery to all the connection’s MessageConsumers is inhibited: synchronous receives block, and messages are not delivered to MessageListeners. We honor this for undelivered messages from server - but if stop is called while there are pending messages yet to be delivered to a listener (or buffered in subscriber for receive), then they will be delivered irrespective of stop(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (BOOKKEEPER-312) Implementation of JMS provider
[ https://issues.apache.org/jira/browse/BOOKKEEPER-312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated BOOKKEEPER-312: --- Attachment: (was: hedwig-client-jms.patch.2) Implementation of JMS provider -- Key: BOOKKEEPER-312 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-312 Project: Bookkeeper Issue Type: Sub-task Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 4.3.0 Attachments: hedwig-client-jms.patch.4, hedwig-client-jms.patch.5, hedwig-client-jms.patch.9, hedwig-client-jms.patch.9 The JMS provider implementation conforming to the 1.1 spec. The limitations as of now are : 1) No support for Queue's : Hedwig currently does not have a notion of JMS queue's for us to leverage. 2) No support for noLocal : Hedwig DOES NOT conform to JMS model of connection -(n)- session -(n)- publisher/subscriber. Each session has a hedwig connection. Currently I am simulating noLocal, but this IS fragile and works for the duration of connection - ONLY until the message id is still in a LRUCache. As mentioned before, this is a kludge, and not a good solution. 3) Note that everything is durable in hedwig - so we do not support NON_PERSISTENT delivery mode. 4) Calling unsubscribe on a durable subscription will fail if it was NOT created in the current session. In hedwig, to unsubscribe, we need the subscription id and the topic ... To simulate unsubscribe(), we store the subscriberId to topicName mapping when a create* api is invoked. Hence, if create* was NOT called, then we have no way to infer which topic the subscription-id refers to from hedwig, and so cant unsubscribe. The workaround is - simply create a durable subsriber just as a workaround of this limitation - the topicName will be known to the user/client anyway. 5) Explicit session recovery is not supported. Reconnection of hedwig session (either explicitly or implicitly by underlying client implementation) will automatically trigger redelivery of un-acknowledged messages. 6) Because of the above, setting the JMSRedelivered flag is almost impossible in a consistent way. Currently, we simulate it for redelivery due to provider side events : rollback of txn, exception in message listener (primarily). At best we can simulate it with a kludge - at risk of potentially running out of resources ... this is being investigated : but unlikely to have a clean fix. 7) Hedwig only supports marking all messages until seq-id as received : while JMS indicates ability to acknowledge individual messages. This distinction is currently unsupported. 8) JMS spec requires A connection's delivery of incoming messages can be temporarily stopped using its stop() method. It can be restarted using its start() method. When the connection is stopped, delivery to all the connection’s MessageConsumers is inhibited: synchronous receives block, and messages are not delivered to MessageListeners. We honor this for undelivered messages from server - but if stop is called while there are pending messages yet to be delivered to a listener (or buffered in subscriber for receive), then they will be delivered irrespective of stop(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira