Re: Low throughput and effect of GC in SparkSql GROUP BY
I hadn't turned on codegen. I enabled it and ran it again, it is running 4-5 times faster now! :) Since my log statements are no longer appearing, I presume the code path seems quite different from the earlier hashmap related stuff in Aggregates.scala? Pramod On Wed, May 20, 2015 at 9:18 PM, Reynold Xin r...@databricks.com wrote: Does this turn codegen on? I think the performance is fairly different when codegen is turned on. For 1.5, we are investigating having codegen on by default, so users get much better performance out of the box. On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com wrote: Hi, Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a data point regarding the performance of Group By, indicating there's excessive GC and it's impacting the throughput. I want to know if the new memory manager for aggregations ( https://github.com/apache/spark/pull/5725/) is going to address this kind of issue. I only have a small amount of data on each node (~360MB) with a large heap size (18 Gig). I still see 2-3 minor collections happening whenever I do a Select Sum() with a group by(). I have tried with different sizes for Young Generation without much effect, though not with different GC algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps). I have made a chart of my results [1] by adding timing code to Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab benchmark, running over 10 million records. The chart is from one of the 4 worker nodes in the cluster. I am trying to square this with a claim on the Project Tungsten blog post [2]: When profiling Spark user applications, we’ve found that a large fraction of the CPU time is spent waiting for data to be fetched from main memory. Am I correct in assuming that SparkSql is yet to reach that level of efficiency, at least in aggregation operations? Thanks. [1] - https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174 [2] https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html Pramod
Re: Low throughput and effect of GC in SparkSql GROUP BY
Yup it is a different path. It runs GeneratedAggregate. On Wed, May 20, 2015 at 11:43 PM, Pramod Biligiri pramodbilig...@gmail.com wrote: I hadn't turned on codegen. I enabled it and ran it again, it is running 4-5 times faster now! :) Since my log statements are no longer appearing, I presume the code path seems quite different from the earlier hashmap related stuff in Aggregates.scala? Pramod On Wed, May 20, 2015 at 9:18 PM, Reynold Xin r...@databricks.com wrote: Does this turn codegen on? I think the performance is fairly different when codegen is turned on. For 1.5, we are investigating having codegen on by default, so users get much better performance out of the box. On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com wrote: Hi, Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a data point regarding the performance of Group By, indicating there's excessive GC and it's impacting the throughput. I want to know if the new memory manager for aggregations ( https://github.com/apache/spark/pull/5725/) is going to address this kind of issue. I only have a small amount of data on each node (~360MB) with a large heap size (18 Gig). I still see 2-3 minor collections happening whenever I do a Select Sum() with a group by(). I have tried with different sizes for Young Generation without much effect, though not with different GC algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps). I have made a chart of my results [1] by adding timing code to Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab benchmark, running over 10 million records. The chart is from one of the 4 worker nodes in the cluster. I am trying to square this with a claim on the Project Tungsten blog post [2]: When profiling Spark user applications, we’ve found that a large fraction of the CPU time is spent waiting for data to be fetched from main memory. Am I correct in assuming that SparkSql is yet to reach that level of efficiency, at least in aggregation operations? Thanks. [1] - https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174 [2] https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html Pramod
Re: Spark Streaming - Design considerations/Knobs
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for reading and replying. However, I have a follow-up question: I don't think if I understand the block replication completely. Are the blocks replicated immediately after they are received by the receiver? Or are they kept on the receiver node only and are moved only on shuffle? Has the replication something to do with locality.wait? Thanks, Hemant On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote: Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }// TWO Spark jobs per batch dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = rdd.count } // TWO Spark jobs per batch - - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. You can limit the rate of receiver using SparkConf config spark.streaming.receiver.maxRate - - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. Incomplete. There are two types of checkpointing - data and metadata. Only data checkpointing, needed by only some operations, increase batch processing time. Read - http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Furthemore, with checkpoint you can recover computation, but you
Re: Resource usage of a spark application
Thanks Akhil, Ryan! @Akhil: YARN can only tell me how much vcores my app has been granted but not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need to map JVM executor processes to the context they belong to, right? @Ryan: what a great blog post -- this is super relevant for me to analyze the state of the cluster as a whole. However, it seems to me that those metrics are mostly reported globally and not per spark application. 2015-05-19 21:43 GMT+02:00 Ryan Williams ryan.blake.willi...@gmail.com: Hi Peter, a few months ago I was using MetricsSystem to export to Graphite and then view in Grafana; relevant scripts and some instructions are here https://github.com/hammerlab/grafana-spark-dashboards/ if you want to take a look. On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer peter.prettenho...@gmail.com wrote: Hi all, I'm looking for a way to measure the current memory / cpu usage of a spark application to provide users feedback how much resources are actually being used. It seems that the metric system provides this information to some extend. It logs metrics on application level (nr of cores granted) and on the JVM level (memory usage). Is this the recommended way to gather this kind of information? If so, how do i best map a spark application to the corresponding JVM processes? If not, should i rather request this information from the resource manager (e.g. Mesos/YARN)? thanks, Peter -- Peter Prettenhofer -- Peter Prettenhofer
Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Looks like somehow the file size reported by the FSInputDStream of Tachyon's FileSystem interface, is returning zero. On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to follow up this thread further . I was doing some fault tolerant testing of Spark Streaming with Tachyon as OFF_HEAP block store. As I said in earlier email, I could able to solve the BlockNotFound exception when I used Hierarchical Storage of Tachyon , which is good. I continue doing some testing around storing the Spark Streaming WAL and CheckPoint files also in Tachyon . Here is few finding .. When I store the Spark Streaming Checkpoint location in Tachyon , the throughput is much higher . I tested the Driver and Receiver failure cases , and Spark Streaming is able to recover without any Data Loss on Driver failure. *But on Receiver failure , Spark Streaming looses data* as I see Exception while reading the WAL file from Tachyon receivedData location for the same Receiver id which just failed. If I change the Checkpoint location back to HDFS , Spark Streaming can recover from both Driver and Receiver failure . Here is the Log details when Spark Streaming receiver failed ...I raised a JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch 1)* INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered receiver for stream 2 from 10.252.5.62*:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)* at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IllegalArgumentException:* Seek position is past EOF: 645603894, fileSize = 0* at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) ... 15 more INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft:// 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) [duplicate 1] INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes) INFO : org.apache.spark.deploy.client.AppClient$ClientActor -
Re: Adding/Using More Resolution Types on JIRA
Some examples to illustrate my point. A couple of issues from the oldest open issues in the SQL component: [SQL] spark-sql exits while encountered an error https://issues.apache.org/jira/browse/SPARK-4572 This is an incomplete report that nobody can take action on. It can be resolved as Incomplete, instead of Inactive. In fact, there is no need to wait that much to close it as Incomplete. Using a field in a WHERE clause that is not in the schema does not throw an exception https://issues.apache.org/jira/browse/SPARK-5305 This is also Incomplete, it is not reproducible currently and it is missing version info. Rather than a need of these new statuses, it seems that there is a need of more people assessing and triaging new issues. Best, -- Santiago M. Mola http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd https://twitter.com/StratioBD*
Re: Tungsten's Vectorized Execution
We have not start to prototype the vectorized one yet, will evaluated in 1.5 and may targeted for 1.6. We're glad to hear some feedback/suggestions/comments from your side! On Thu, May 21, 2015 at 9:37 AM, Yijie Shen henry.yijies...@gmail.com wrote: Hi all, I’ve seen the Blog of Project Tungsten here, it sounds awesome to me! I’ve also noticed there is a plan to change the code generation from record-at-a-time evaluation to a vectorized one, which interests me most. What’s the status of vectorized evaluation? Is this an inner effort of Databricks or welcome to be involved? Since I’ve done similar stuffs on Spark SQL, I would like to get involved if that’s possible. Yours, Yijie - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Adding/Using More Resolution Types on JIRA
On Thu, May 21, 2015 at 9:06 PM, Santiago Mola sm...@stratio.com wrote: Inactive - A feature or bug that has had no activity from users or developers in a long time Why is this needed? Every JIRA listing can be sorted by activity. That gets the inactive ones out of your view quickly. I do not see any reason why an issue should be closed because of this. If it's inactive, maybe it's because it falls on some of the other categories (out of scope, later, won't fix). I don't think sorting helps or that browsing is the issue. What if you're searching for Open Critical issues concerning Pyspark? If the list is full of issues that are actually out of scope, later, won't fix, then that's a problem. That is a much more specific case than Inactivity, and a lot of large scale open source projects use specific resolutions for this. Yes, that's CannotReproduce. I think the walking-dead JIRAs we have in mind are some combination of: a JIRA opened without a lot of detail, that might or might not be a problem, nobody else seemed to have the problem and/or nobody cared to investigate, much has changed since anyway so might be obsolete. WontFix, CannotReproduce, NotAProblem are all possibly reasonable resolutions. If this is just about semantics, I also don't feel a strong need for a new state. On a more general note: what is the problem with open issues / pull requests? I see a tendency in the Spark project to do unusual things with issues / PRs just to maintain the numbers low. For example, closing PRs after a couple of weeks of inactivity just to shrink the queue or closing active issues just for the shake of it. Honestly, this looks a lot like trying to game metrics. But maybe there is something that I am missing. Game for whose benefit? nobody is being evaluated on this stuff. This is being proposed for real reasons, not for fun. A bunch of JIRA cruft is a symptom, not a cause. Something is wrong somewhere if people file JIRAs and they go nowhere. Everyone's time is wasted and with no conclusion, there's no feedback or learning anywhere. So it keeps happening. Is it bad JIRAs? scope issues? lack of follow up from developer or contributor? all of the above? I actually think it's mostly bad JIRAs: too large, too invasive, not that useful, hacky fixes to a facet of a problem, incomplete description, duplicate, etc. I think it's more useful to actually close these to communicate back clearly what is not going to be accepted. Things can be reopened if needed. Silently ignoring them forever as an Open JIRA seems less constructive. Maybe what it is actually needed is to improve the lifecycle of an issue while it is alive, instead of trying to kill it earlier. Some examples of this that are used on other projects are the incomplete status to signal that there is more info required from the reporter in order to take further action. Also confirmed to acknowledge that a bug is confirmed to be present and needs action by a developer. Yes, best to try to make the process better. That's why I started with things like a more comprehensive https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark to make better contributions in the first place. By the time dead JIRAs are closed, something's already gone wrong and time has been wasted. But we still need that culture of not letting stuff sit around. I don't mind the idea of an Unresolved InformationNeeded status, yeah. I don't actually think that would solve a problem though. The dead JIRAs are ones that never got any follow up, or, that got a lot of follow-up from the contributor but aren't going to be merged. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Adding/Using More Resolution Types on JIRA
2015-05-12 9:50 GMT+02:00 Patrick Wendell pwend...@gmail.com: Inactive - A feature or bug that has had no activity from users or developers in a long time Why is this needed? Every JIRA listing can be sorted by activity. That gets the inactive ones out of your view quickly. I do not see any reason why an issue should be closed because of this. If it's inactive, maybe it's because it falls on some of the other categories (out of scope, later, won't fix). I can only think about a case where closing an inactive issue makes sense: * A bug was reported long time ago. Nobody was able to reproduce (after trying actively) and the reporter is no longer around to provide more info. That is a much more specific case than Inactivity, and a lot of large scale open source projects use specific resolutions for this. On a more general note: what is the problem with open issues / pull requests? I see a tendency in the Spark project to do unusual things with issues / PRs just to maintain the numbers low. For example, closing PRs after a couple of weeks of inactivity just to shrink the queue or closing active issues just for the shake of it. Honestly, this looks a lot like trying to game metrics. But maybe there is something that I am missing. Maybe what it is actually needed is to improve the lifecycle of an issue while it is alive, instead of trying to kill it earlier. Some examples of this that are used on other projects are the incomplete status to signal that there is more info required from the reporter in order to take further action. Also confirmed to acknowledge that a bug is confirmed to be present and needs action by a developer. Best, -- Santiago M. Mola http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd https://twitter.com/StratioBD*
Re: Adding/Using More Resolution Types on JIRA
On Thu, May 21, 2015 at 10:03 PM, Santiago Mola sm...@stratio.com wrote: Sure. That is why I was talking about the Inactive resolution specifically. The combination of Priority + other statuses are enough to solve these issues. A minor/trivial issue that is incomplete is probably not going to hurt much to someone looking for critical open issues. If you mean you intend to consider them resolved, then yeah we agree a lot. The names of the resolved states don't matter nearly so much to me. For instance, in your examples, I could call those CannotReproduce or Incomplete. I would not want to leave them Open in that state indefinitely. On a side-note, I would like to contribute some time on improving this. When identifying this kind of issue, should I ask in the issue itself to resolve it in a specific way? I think reviewing JIRAs actually contributes to a better overall process, so I'd just dive in. Anything to advance a JIRA / PR to a resolution is very helpful. Ask for more info, investigate a problem to confirm it or fail to reproduce, propose a fix, identify duplicates, flag JIRAs for closing, review changes, say you think the change is good, etc. -- among the most helpful things anyone can do right now IMHO. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Contribute code to MLlib
Thank you Ram and Joseph. I am also hoping to contribute to MLib once my Scala gets up to snuff, this is the guidance I needed for how to proceed when ready. Best wishes, Trevor On Wed, May 20, 2015 at 1:55 PM, Joseph Bradley jos...@databricks.com wrote: Hi Trevor, I may be repeating what Ram said, but to 2nd it, a few points: We do want MLlib to become an extensive and rich ML library; as you said, scikit-learn is a great example. To make that happen, we of course need to include important algorithms. Important is hazy, but roughly means being useful to a large number of users, improving a large number of use cases (above what is currently available), and being well-established and tested. Others and I may not be familiar with Tarek's algorithm (since it is so new), so it will be important to discuss details on JIRA to establish the cases in which the algorithm improves over current PCA. That may require discussion, community testing, etc. If we establish that it is a clear improvement in a large domain, then it could be valuable to have in MLlib proper. It's always going to be hard to tell where to draw the line, so less common algorithms will require more testing before we commit to including them in MLlib. I like the Spark package suggestion since it would allow users immediately start using the code, while the discussion on JIRA happens. (Plus, if package users find it useful, they can report that on the JIRA.) Joseph On Wed, May 20, 2015 at 10:01 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor I'm attaching the MLLib contribution guideline here: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines It speaks to widely known and accepted algorithms but not to whether an algorithm has to be better than another in every scenario etc I think the guideline explains what a good contribution to the core library should look like better than I initially attempted to ! Sent from my iPhone On May 20, 2015, at 9:31 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor Good point, I didn't mean that some algorithm has to be clearly better than another in every scenario to be included in MLLib. However, even if someone is willing to be the maintainer of a piece of code, it does not make sense to accept every possible algorithm into the core library. That said, the specific algorithms should be discussed in the JIRA: as you point out, there is no clear way to decide what algorithm to include and what not to, and usually mature algorithms that serve a wide variety of scenarios are easier to argue about but nothing prevents anyone from opening a ticket to discuss any specific machine learning algorithm. My suggestion was simply that for purposes of making experimental or newer algorithms available to Spark users, it doesn't necessarily have to be in the core library. Spark packages are good enough in this respect. Isn't it better for newer algorithms to take this route and prove themselves before we bring them into the core library? Especially given the barrier to using spark packages is very low. Ram On Wed, May 20, 2015 at 9:05 AM, Trevor Grant trevor.d.gr...@gmail.com wrote: Hey Ram, I'm not speaking to Tarek's package specifically but to the spirit of MLib. There are a number of method/algorithms for PCA, I'm not sure by what criterion the current one is considered 'standard'. It is rare to find ANY machine learning algo that is 'clearly better' than any other. They are all tools, they have their place and time. I agree that it makes sense to field new algorithms as packages and then integrate into MLib once they are 'proven' (in terms of stability/performance/anyone cares). That being said, if MLib takes the stance that 'what we have is good enough unless something is *clearly* better', then it will never grow into a suite with the depth and richness of sklearn. From a practitioner's stand point, its nice to have everything I could ever want ready in an 'off-the-shelf' form. 'A large number of use cases better than existing' shouldn't be a criteria when selecting what to include in MLib. The important question should be, 'Are you willing to take on responsibility for maintaining this because you may be the only person on earth who understands the mechanics AND how to code it?'. Obviously we don't want any random junk algo included. But trying to say, 'this way of doing PCA is better than that way in a large class of cases' is like trying to say 'geometry is more important than calculus in large class of cases, maybe its true- but geometry won't help you if you are in a case where you need calculus. This all relies on the assumption that MLib is destined to be a rich data science/machine learning package. It may be that the goal is to make the project as lightweight and parsimonious as possible, if so
Re: Resource usage of a spark application
Yes Peter that's correct, you need to identify the processes and with that you can pull the actual usage metrics. Thanks Best Regards On Thu, May 21, 2015 at 2:52 PM, Peter Prettenhofer peter.prettenho...@gmail.com wrote: Thanks Akhil, Ryan! @Akhil: YARN can only tell me how much vcores my app has been granted but not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need to map JVM executor processes to the context they belong to, right? @Ryan: what a great blog post -- this is super relevant for me to analyze the state of the cluster as a whole. However, it seems to me that those metrics are mostly reported globally and not per spark application. 2015-05-19 21:43 GMT+02:00 Ryan Williams ryan.blake.willi...@gmail.com: Hi Peter, a few months ago I was using MetricsSystem to export to Graphite and then view in Grafana; relevant scripts and some instructions are here https://github.com/hammerlab/grafana-spark-dashboards/ if you want to take a look. On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer peter.prettenho...@gmail.com wrote: Hi all, I'm looking for a way to measure the current memory / cpu usage of a spark application to provide users feedback how much resources are actually being used. It seems that the metric system provides this information to some extend. It logs metrics on application level (nr of cores granted) and on the JVM level (memory usage). Is this the recommended way to gather this kind of information? If so, how do i best map a spark application to the corresponding JVM processes? If not, should i rather request this information from the resource manager (e.g. Mesos/YARN)? thanks, Peter -- Peter Prettenhofer -- Peter Prettenhofer
Re:Re: Low throughput and effect of GC in SparkSql GROUP BY
Hi Pramod Is your data compressed? I encountered similar problem,however, after turned codegen on, the GC time was still very long.The size of input data for my map task is about 100M lzo file. My query is select ip, count(*) as c from stage_bitauto_adclick_d group by ip sort by c limit 100 Thanks Zhang Xiongfei At 2015-05-21 12:18:35, Reynold Xin r...@databricks.com wrote: Does this turn codegen on? I think the performance is fairly different when codegen is turned on. For 1.5, we are investigating having codegen on by default, so users get much better performance out of the box. On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com wrote: Hi, Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a data point regarding the performance of Group By, indicating there's excessive GC and it's impacting the throughput. I want to know if the new memory manager for aggregations (https://github.com/apache/spark/pull/5725/) is going to address this kind of issue. I only have a small amount of data on each node (~360MB) with a large heap size (18 Gig). I still see 2-3 minor collections happening whenever I do a Select Sum() with a group by(). I have tried with different sizes for Young Generation without much effect, though not with different GC algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps). I have made a chart of my results [1] by adding timing code to Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab benchmark, running over 10 million records. The chart is from one of the 4 worker nodes in the cluster. I am trying to square this with a claim on the Project Tungsten blog post [2]: When profiling Spark user applications, we’ve found that a large fraction of the CPU time is spent waiting for data to be fetched from main memory. Am I correct in assuming that SparkSql is yet to reach that level of efficiency, at least in aggregation operations? Thanks. [1] - https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174 [2] https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html Pramod
Re: Resource usage of a spark application
On Thu, May 21, 2015 at 5:22 AM Peter Prettenhofer peter.prettenho...@gmail.com wrote: Thanks Akhil, Ryan! @Akhil: YARN can only tell me how much vcores my app has been granted but not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need to map JVM executor processes to the context they belong to, right? @Ryan: what a great blog post -- this is super relevant for me to analyze the state of the cluster as a whole. However, it seems to me that those metrics are mostly reported globally and not per spark application. Thanks! You can definitely analyze metrics per-application in several ways: - If you're running Spark on YARN, use the app URL param https://github.com/hammerlab/grafana-spark-dashboards#appyarn-app-id to specify a YARN application ID, which will set the Spark application ID as well as parse job start/end times. - Set the prefix URL param https://github.com/hammerlab/grafana-spark-dashboards#prefixmetric-prefix to your Spark app's ID, and all metrics will be namespaced to that app ID. - You actually have to do one of these two, otherwise it doesn't know what app's metrics to look for; it is set up specifically to view per-app metrics. - There is a dropdown in the upper-left of the page (sorry, don't have a screenshot right now) that will let you select from all app IDs that graphite has seen metrics from. Let me know, here or in issues on the repo, if you have any issues with that or that doesn't make sense! 2015-05-19 21:43 GMT+02:00 Ryan Williams ryan.blake.willi...@gmail.com: Hi Peter, a few months ago I was using MetricsSystem to export to Graphite and then view in Grafana; relevant scripts and some instructions are here https://github.com/hammerlab/grafana-spark-dashboards/ if you want to take a look. On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer peter.prettenho...@gmail.com wrote: Hi all, I'm looking for a way to measure the current memory / cpu usage of a spark application to provide users feedback how much resources are actually being used. It seems that the metric system provides this information to some extend. It logs metrics on application level (nr of cores granted) and on the JVM level (memory usage). Is this the recommended way to gather this kind of information? If so, how do i best map a spark application to the corresponding JVM processes? If not, should i rather request this information from the resource manager (e.g. Mesos/YARN)? thanks, Peter -- Peter Prettenhofer -- Peter Prettenhofer
Why use lib_managed for the Sbt build?
I’m trying to understand why Sbt is configured to pull all libs under lib_managed. - it seems like unnecessary duplication (I will have those libraries under ./m2, via maven anyway) - every time I call make-distribution I lose lib_managed (via mvn clean install) and have to wait to download again all jars next time I use sbt - Eclipse does not handle relative paths very well (source attachments from lib_managed don’t always work) So, what is the advantage of putting all dependencies in there, instead of using the default `~/.ivy2`? cheers, iulian -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Re: Change for submitting to yarn in 1.3.1
This is an excellent discussion. As mentioned in an earlier email, we agree with a number of Chester's suggestions, but we have yet other concerns. I've researched this further in the past several days, and I've queried my team. This email attempts to capture those other concerns. Making yarn.Client private has prevented us from moving from Spark 1.0.x to Spark 1.2 or 1.3 despite many alluring new features. The SparkLauncher, which provides “support for programmatically running Spark jobs” (SPARK-3733 and SPARK-4924) will not work in our environment or for our use case -- which requires programmatically initiating and monitoring Spark jobs on Yarn in cluster mode from a cloud-based application server. It is not just that the Yarn ApplicationId is no longer directly or indirectly available. More critically, it violates constraints imposed by any application server and additional constraints imposed by security, process, and dynamic resource allocation requirements in our cloud services environment. In Spark 1.0 and 1.1, with yarn.Client public, our applications' job scheduler marshalls configuration and environmental resources necessary for any Spark job, including cluster-, data- or job-specific parameters, makes the appropriate calls to initialize and run yarn.Client, which together with the other classes in the spark-yarn module requests the Yarn resource manager to start and monitor a job (see Figure 1) on the cluster. (Our job scheduler is not Yarn replacement; it leverages Yarn to coordinate a variety of different Spark analytic and data enrichment jobs.) More recent Spark versions make yarn.Client private and thus remove that capability, but the SparkLauncher, scheduled for Spark 1.4, replaces this simple programmatic solution with one considerably more complicated. Based on our understanding, in this scenario, our job scheduler marshalls configuration and environmental resources for the SparkLauncher much as it did for yarn.Client. It then calls launch() to initialize a new Linux process to execute the spark-submit shell script with the specified configuration and environment, which in turn starts a new JVM (with the Spark assembly jar in its class path) that executes launcher.Main. This ultimately calls yarn.Client (see Figure 2). This is more than an arm's-length transaction. There are three legs: job scheduler SparkLauncher.launch() call → spark-submit bash execution → launcher.Main call to yarn.Client → Yarn resource manager allocation and execution of job driver and executors. Not only is this scenario unnecessarily complicated, it will simply not work. The “programmatic” call to SparkLauncher.launch() starts a new JVM, which is not allowed in any application server, which must own all its JVMs. Perhaps, spark-submit and the launcher.Main JVM process could be hosted outside the application server, but in violation of security and multiple-tenant cloud architectural constraints. We appreciate that yarn.Client was perhaps never intended to be public. Configuring it is not for the faint-of-heart, and some of its methods should indeed be private. We wonder whether there is another option. In researching and discussing these issues with Cloudera and others, we've been told that only one mechanism is supported for starting Spark jobs: the spark-submit scripts. We also have gathered (perhaps mistakenly) from discussions reaching back 20 months that Spark's intention is to have a unified job submission interface for all supported platforms. Unfortunately this doesn't recognize the asymmetries among those platforms. Submitting a local Spark job or a job to a Spark master in cluster mode may indeed require initializing a separate process in order to pass configuration parameters via the environment and command line. But Spark's yarn.Client in cluster mode already has an arm's length relationship with the Yarn resource manager. Configuration may be passed from the job scheduling application to yarn.Client as Strings or property map variables and method parameters. Our request is for a public yarn.Client or some reasonable facsimile. Thanks. On 05/13/2015 08:22 PM, Patrick Wendell wrote: Hey Chester, Thanks for sending this. It's very helpful to have this list. The reason we made the Client API private was that it was never intended to be used by
Re: Change for submitting to yarn in 1.3.1
Hi Kevin, I read through your e-mail and I see two main things you're talking about. - You want a public YARN Client class and don't really care about anything else. In you message you already mention why that's not a good idea. It's much better to have a standardized submission API. As you noticed by working with the previous Client API, it's not for the faint of heart; SparkSubmit hides a lot of the complexity, and does so in a way that is transparent to the caller. Whether you're submitting a Scala, Python, or R app against standlone, yarn, mesos, or local, the interface is the same. You may argue that for your particular use case you don't care about anything other than Scala apps on YARN cluster mode, but Spark does need to care about more than that. I still think that once we have a way to expose more information about the application being launched (more specifically, the app id), then doing anything else you may want to do that is specific to YARN is up to you and pretty easy to do. But I strongly believe that having different ways to launch apps in Spark is not good design. - You have some restriction that you app servers cannot fork processes Honestly, I didn't really understand what that is about. Why can't you fork processes? Is it a restriction regarding what you can deploy on the server (e.g. you cannot have a full Spark installation, everything needs to be contained in a jar that is deployed in the app server)? I really don't believe this is about the inability to fork a process, so it must be something else. The unfortunate reality is that Spark is really not friendly multiple things being launched from the same JVM. Doing that is prone to apps running all over each other and overwriting configs and other things, which would lead to many, many tears. Once the limitations around that are fixed, then we can study adding a way to launch multiple Spark apps from the same JVM, but right now that's just asking for (hard to debug) trouble. It might be possible to add support for launching subprocesses without having to invoke the shell scripts; that would have limitations (e.g. no spark-env.sh support). In fact I did something like that in the first implementation of the launcher library, but was asked to go through the shell scripts during code review. (I even had a different method that launched in the same VM, but that one suffered from all the problems described in the paragraph above.) On Thu, May 21, 2015 at 5:21 PM, Kevin Markey kevin.mar...@oracle.com wrote: This is an excellent discussion. As mentioned in an earlier email, we agree with a number of Chester's suggestions, but we have yet other concerns. I've researched this further in the past several days, and I've queried my team. This email attempts to capture those other concerns. Making *yarn.Client* private has prevented us from moving from Spark 1.0.x to Spark 1.2 or 1.3 despite many alluring new features. The SparkLauncher, which provides “support for programmatically running Spark jobs” (SPARK-3733 and SPARK-4924) will not work in our environment or for our use case -- which requires programmatically initiating and monitoring Spark jobs on Yarn in cluster mode *from a cloud-based application server*. It is not just that the Yarn *ApplicationId* is no longer directly or indirectly available. More critically, it violates constraints imposed by any application server and additional constraints imposed by security, process, and dynamic resource allocation requirements in our cloud services environment. In Spark 1.0 and 1.1, with *yarn.Client* *public*, our applications' *job scheduler* marshalls configuration and environmental resources necessary for any Spark job, including cluster-, data- or job-specific parameters, makes the appropriate calls to initialize and run *yarn.Client*, which together with the other classes in the spark-yarn module requests the Yarn resource manager to start and monitor a job (see Figure 1) on the cluster. (Our job scheduler is not Yarn replacement; it leverages Yarn to coordinate a variety of different Spark analytic and data enrichment jobs.) More recent Spark versions make *yarn.Client* *private* and thus remove that capability, but the *SparkLauncher*, scheduled for Spark 1.4, replaces this simple programmatic solution with one considerably more complicated. Based on our understanding, in this scenario, our job scheduler marshalls configuration and environmental resources for the *SparkLauncher *much as it did for *yarn.Client*. It then calls *launch() *to initialize a new Linux process to execute the *spark-submit *shell script with the specified configuration and environment, which in turn starts a new JVM (with the Spark assembly jar in its class path) that executes *launcher.Main. *This ultimately calls *yarn.Client* (see Figure 2). This is more than an arm's-length transaction. There are three legs: job scheduler *SparkLauncher.launch()
Re: Re: Low throughput and effect of GC in SparkSql GROUP BY
Hi Zhang, No my data is not compressed. I'm trying to minimize the load on the CPU. The GC time reduced for me after codegen. Pramod On Thu, May 21, 2015 at 3:43 AM, zhangxiongfei zhangxiongfei0...@163.com wrote: Hi Pramod Is your data compressed? I encountered similar problem,however, after turned codegen on, the GC time was still very long.The size of input data for my map task is about 100M lzo file. My query is select ip, count(*) as c from stage_bitauto_adclick_d group by ip sort by c limit 100 Thanks Zhang Xiongfei At 2015-05-21 12:18:35, Reynold Xin r...@databricks.com wrote: Does this turn codegen on? I think the performance is fairly different when codegen is turned on. For 1.5, we are investigating having codegen on by default, so users get much better performance out of the box. On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com wrote: Hi, Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a data point regarding the performance of Group By, indicating there's excessive GC and it's impacting the throughput. I want to know if the new memory manager for aggregations ( https://github.com/apache/spark/pull/5725/) is going to address this kind of issue. I only have a small amount of data on each node (~360MB) with a large heap size (18 Gig). I still see 2-3 minor collections happening whenever I do a Select Sum() with a group by(). I have tried with different sizes for Young Generation without much effect, though not with different GC algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps). I have made a chart of my results [1] by adding timing code to Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab benchmark, running over 10 million records. The chart is from one of the 4 worker nodes in the cluster. I am trying to square this with a claim on the Project Tungsten blog post [2]: When profiling Spark user applications, we’ve found that a large fraction of the CPU time is spent waiting for data to be fetched from main memory. Am I correct in assuming that SparkSql is yet to reach that level of efficiency, at least in aggregation operations? Thanks. [1] - https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174 [2] https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html Pramod
Re: Change for submitting to yarn in 1.3.1
In researching and discussing these issues with Cloudera and others, we've been told that only one mechanism is supported for starting Spark jobs: the *spark-submit* scripts. Is this new? We've been submitting jobs directly from a programatically created spark context (instead of through spark-submit) from the beginning (from 0.7.x to 1.2) - to a local cluster. In moving to 1.3 on Yarn cluster recently, we've had no end of problems trying to switch this over (though I think we're almost there). Why would one want to eliminate this possibility? -Nathan
Testing spark applications
see discussions about Spark not really liking multiple contexts in the same JVM Speaking of this - is there a standard way of writing unit tests that require a SparkContext? We've ended up copying out the code of SharedSparkContext to our own testing hierarchy, but it occurs to me someone would have published a test jar by now if that was the best way. -Nathan
Re: Change for submitting to yarn in 1.3.1
Hi Nathan, On Thu, May 21, 2015 at 7:30 PM, Nathan Kronenfeld nkronenfeld@uncharted.software wrote: In researching and discussing these issues with Cloudera and others, we've been told that only one mechanism is supported for starting Spark jobs: the *spark-submit* scripts. Is this new? We've been submitting jobs directly from a programatically created spark context (instead of through spark-submit) from the beginning (from 0.7.x to 1.2) - to a local cluster. In moving to 1.3 on Yarn cluster recently, we've had no end of problems trying to switch this over (though I think we're almost there). Instantiating SparkContext directly works. Well, sorta: it has limitations. For example, see discussions about Spark not really liking multiple contexts in the same JVM. It also does not work in cluster deploy mode. -- Marcelo
Re: Change for submitting to yarn in 1.3.1
Thanks, Marcelo Instantiating SparkContext directly works. Well, sorta: it has limitations. For example, see discussions about Spark not really liking multiple contexts in the same JVM. It also does not work in cluster deploy mode. That's fine - when one is doing something out of standard, one expects a bit of Caveat Emptor.
Re: Change for submitting to yarn in 1.3.1
we also launch jobs programmatically, both on standalone mode and yarn-client mode. in standalone mode it always worked, in yarn-client mode we ran into some issues and were forced to use spark-submit, but i still have on my todo list to move back to a normal java launch without spark-submit at some point. for me spark is a library that i use to do distributed computations within my app, and ideally a library should not tell me how to launch my app. i mean, if multiple libraries that i use all had their own launch script i would get stuck very quickly hadoop jar vs spark-submit vs kiji launch vs hbase jar bad idea, i think! however i do understand the practical reasons why spark-submit can about... On Thu, May 21, 2015 at 10:30 PM, Nathan Kronenfeld nkronenfeld@uncharted.software wrote: In researching and discussing these issues with Cloudera and others, we've been told that only one mechanism is supported for starting Spark jobs: the *spark-submit* scripts. Is this new? We've been submitting jobs directly from a programatically created spark context (instead of through spark-submit) from the beginning (from 0.7.x to 1.2) - to a local cluster. In moving to 1.3 on Yarn cluster recently, we've had no end of problems trying to switch this over (though I think we're almost there). Why would one want to eliminate this possibility? -Nathan
Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Hi Tathagata, Thanks for looking into this. Further investigating I found that the issue is with Tachyon does not support File Append. The streaming receiver which writes to WAL when failed, and again restarted, not able to append to same WAL file after restart. I raised this with Tachyon user group, and Haoyuan told that within 3 months time Tachyon file append will be ready. Will revisit this issue again then . Regards, Dibyendu On Fri, May 22, 2015 at 12:24 AM, Tathagata Das t...@databricks.com wrote: Looks like somehow the file size reported by the FSInputDStream of Tachyon's FileSystem interface, is returning zero. On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to follow up this thread further . I was doing some fault tolerant testing of Spark Streaming with Tachyon as OFF_HEAP block store. As I said in earlier email, I could able to solve the BlockNotFound exception when I used Hierarchical Storage of Tachyon , which is good. I continue doing some testing around storing the Spark Streaming WAL and CheckPoint files also in Tachyon . Here is few finding .. When I store the Spark Streaming Checkpoint location in Tachyon , the throughput is much higher . I tested the Driver and Receiver failure cases , and Spark Streaming is able to recover without any Data Loss on Driver failure. *But on Receiver failure , Spark Streaming looses data* as I see Exception while reading the WAL file from Tachyon receivedData location for the same Receiver id which just failed. If I change the Checkpoint location back to HDFS , Spark Streaming can recover from both Driver and Receiver failure . Here is the Log details when Spark Streaming receiver failed ...I raised a JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch 1)* INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered receiver for stream 2 from 10.252.5.62*:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)* at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IllegalArgumentException:* Seek position is past EOF: 645603894, fileSize = 0* at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) ... 15 more INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) INFO :
Re: Testing spark applications
It is just 15 lines of code to copy, isn't it? On Thu, May 21, 2015 at 7:46 PM, Nathan Kronenfeld nkronenfeld@uncharted.software wrote: see discussions about Spark not really liking multiple contexts in the same JVM Speaking of this - is there a standard way of writing unit tests that require a SparkContext? We've ended up copying out the code of SharedSparkContext to our own testing hierarchy, but it occurs to me someone would have published a test jar by now if that was the best way. -Nathan
Tungsten's Vectorized Execution
Hi all, I’ve seen the Blog of Project Tungsten here, it sounds awesome to me! I’ve also noticed there is a plan to change the code generation from record-at-a-time evaluation to a vectorized one, which interests me most. What’s the status of vectorized evaluation? Is this an inner effort of Databricks or welcome to be involved? Since I’ve done similar stuffs on Spark SQL, I would like to get involved if that’s possible. Yours, Yijie
Customizing Akka configuration for Spark
Hi, Is there some way to customize the Akka configuration for Spark? Specifically, I want to experiment with custom serialization for messages that are send between the driver and executors in standalone mode. Thanks, Akshat