Re: Low throughput and effect of GC in SparkSql GROUP BY

2015-05-21 Thread Pramod Biligiri
I hadn't turned on codegen. I enabled it and ran it again, it is running
4-5 times faster now! :)
Since my log statements are no longer appearing, I presume the code path
seems quite different from the earlier hashmap related stuff in
Aggregates.scala?

Pramod

On Wed, May 20, 2015 at 9:18 PM, Reynold Xin r...@databricks.com wrote:

 Does this turn codegen on? I think the performance is fairly different
 when codegen is turned on.

 For 1.5, we are investigating having codegen on by default, so users get
 much better performance out of the box.


 On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com
  wrote:

 Hi,
 Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have
 a data point regarding the performance of Group By, indicating there's
 excessive GC and it's impacting the throughput. I want to know if the new
 memory manager for aggregations (
 https://github.com/apache/spark/pull/5725/) is going to address this
 kind of issue.

 I only have a small amount of data on each node (~360MB) with a large
 heap size (18 Gig). I still see 2-3 minor collections happening whenever I
 do a Select Sum() with a group by(). I have tried with different sizes for
 Young Generation without much effect, though not with different GC
 algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps).

 I have made a chart of my results [1] by adding timing code to
 Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab
 benchmark, running over 10 million records. The chart is from one of the 4
 worker nodes in the cluster.

 I am trying to square this with a claim on the Project Tungsten blog post
 [2]: When profiling Spark user applications, we’ve found that a large
 fraction of the CPU time is spent waiting for data to be fetched from main
 memory. 

 Am I correct in assuming that SparkSql is yet to reach that level of
 efficiency, at least in aggregation operations?

 Thanks.

 [1] -
 https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174
 [2]
 https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

 Pramod





Re: Low throughput and effect of GC in SparkSql GROUP BY

2015-05-21 Thread Reynold Xin
Yup it is a different path. It runs GeneratedAggregate.

On Wed, May 20, 2015 at 11:43 PM, Pramod Biligiri pramodbilig...@gmail.com
wrote:

 I hadn't turned on codegen. I enabled it and ran it again, it is running
 4-5 times faster now! :)
 Since my log statements are no longer appearing, I presume the code path
 seems quite different from the earlier hashmap related stuff in
 Aggregates.scala?

 Pramod

 On Wed, May 20, 2015 at 9:18 PM, Reynold Xin r...@databricks.com wrote:

 Does this turn codegen on? I think the performance is fairly different
 when codegen is turned on.

 For 1.5, we are investigating having codegen on by default, so users get
 much better performance out of the box.


 On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri 
 pramodbilig...@gmail.com wrote:

 Hi,
 Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I
 have a data point regarding the performance of Group By, indicating there's
 excessive GC and it's impacting the throughput. I want to know if the new
 memory manager for aggregations (
 https://github.com/apache/spark/pull/5725/) is going to address this
 kind of issue.

 I only have a small amount of data on each node (~360MB) with a large
 heap size (18 Gig). I still see 2-3 minor collections happening whenever I
 do a Select Sum() with a group by(). I have tried with different sizes for
 Young Generation without much effect, though not with different GC
 algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps).

 I have made a chart of my results [1] by adding timing code to
 Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab
 benchmark, running over 10 million records. The chart is from one of the 4
 worker nodes in the cluster.

 I am trying to square this with a claim on the Project Tungsten blog
 post [2]: When profiling Spark user applications, we’ve found that a
 large fraction of the CPU time is spent waiting for data to be fetched from
 main memory. 

 Am I correct in assuming that SparkSql is yet to reach that level of
 efficiency, at least in aggregation operations?

 Thanks.

 [1] -
 https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174
 [2]
 https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

 Pramod






Re: Spark Streaming - Design considerations/Knobs

2015-05-21 Thread Hemant Bhanawat
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks
for reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the
blocks replicated immediately after they are received by the receiver? Or
are they kept on the receiver node only and are moved only on shuffle? Has
the replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where 
 N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
 job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
 }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch
 processing time. Read -
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
 Furthemore, with checkpoint you can recover computation, but you 

Re: Resource usage of a spark application

2015-05-21 Thread Peter Prettenhofer
Thanks Akhil, Ryan!

@Akhil: YARN can only tell me how much vcores my app has been granted but
not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need
to map JVM executor processes to the context they belong to, right?

@Ryan: what a great blog post -- this is super relevant for me to analyze
the state of the cluster as a whole. However, it seems to me that those
metrics are mostly reported globally and not per spark application.

2015-05-19 21:43 GMT+02:00 Ryan Williams ryan.blake.willi...@gmail.com:

 Hi Peter, a few months ago I was using MetricsSystem to export to Graphite
 and then view in Grafana; relevant scripts and some instructions are here
 https://github.com/hammerlab/grafana-spark-dashboards/ if you want to
 take a look.


 On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer 
 peter.prettenho...@gmail.com wrote:

 Hi all,

 I'm looking for a way to measure the current memory / cpu usage of a
 spark application to provide users feedback how much resources are actually
 being used.
 It seems that the metric system provides this information to some extend.
 It logs metrics on application level (nr of cores granted) and on the JVM
 level (memory usage).
 Is this the recommended way to gather this kind of information? If so,
 how do i best map a spark application to the corresponding JVM processes?

 If not, should i rather request this information from the resource
 manager (e.g. Mesos/YARN)?

 thanks,
  Peter

 --
 Peter Prettenhofer




-- 
Peter Prettenhofer


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Tathagata Das
Looks like somehow the file size reported by the FSInputDStream of
Tachyon's FileSystem interface, is returning zero.

On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon as
 OFF_HEAP block store. As I said in earlier email, I could able to solve the
 BlockNotFound exception when I used Hierarchical Storage of Tachyon ,
  which is good.

 I continue doing some testing around storing the Spark Streaming WAL and
 CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon receivedData location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I raised a
 JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch
 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
 successfully in removeExecutor
 INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
 receiver for stream 2 from 10.252.5.62*:47255
 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not
 read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)*
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IllegalArgumentException:* Seek position is past
 EOF: 645603894, fileSize = 0*
 at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
 at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
 ... 15 more

 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
 stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
 INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage
 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException
 (Could not read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://
 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
 [duplicate 1]
 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in
 stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
 INFO : org.apache.spark.deploy.client.AppClient$ClientActor - 

Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Santiago Mola
Some examples to illustrate my point. A couple of issues from the oldest
open issues
in the SQL component:

[SQL] spark-sql exits while encountered an error
https://issues.apache.org/jira/browse/SPARK-4572
This is an incomplete report that nobody can take action on. It can be
resolved
as Incomplete, instead of Inactive. In fact, there is no need to wait
that much
to close it as Incomplete.

Using a field in a WHERE clause that is not in the schema does not throw an
exception
https://issues.apache.org/jira/browse/SPARK-5305
This is also Incomplete, it is not reproducible currently and it is
missing version info.

Rather than a need of these new statuses, it seems that there is a need of
more
people assessing and triaging new issues.

Best,
-- 

Santiago M. Mola


http://www.stratio.com/
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
https://twitter.com/StratioBD*


Re: Tungsten's Vectorized Execution

2015-05-21 Thread Davies Liu
We have not start to prototype the vectorized one yet, will evaluated
in 1.5 and may targeted for 1.6.

We're glad to hear some feedback/suggestions/comments from your side!

On Thu, May 21, 2015 at 9:37 AM, Yijie Shen henry.yijies...@gmail.com wrote:
 Hi all,

 I’ve seen the Blog of Project Tungsten here, it sounds awesome to me!

 I’ve also noticed there is a plan to change the code generation from
 record-at-a-time evaluation to a vectorized one, which interests me most.

 What’s the status of vectorized evaluation?  Is this an inner effort of
 Databricks or welcome to be involved?

 Since I’ve done similar stuffs on Spark SQL, I would like to get involved if
 that’s possible.


 Yours,

 Yijie

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Sean Owen
On Thu, May 21, 2015 at 9:06 PM, Santiago Mola sm...@stratio.com wrote:
 Inactive - A feature or bug that has had no activity from users or
 developers in a long time

 Why is this needed? Every JIRA listing can be sorted by activity. That gets
 the inactive ones out of your view quickly. I do not see any reason why an
 issue should be closed because of this. If it's inactive, maybe it's because
 it falls on some of the other categories (out of scope, later, won't fix).

I don't think sorting helps or that browsing is the issue. What if
you're searching for Open Critical issues concerning Pyspark? If the
list is full of issues that are actually out of scope, later, won't
fix, then that's a problem.

 That is a much more specific case than Inactivity, and a lot of large scale
 open source projects use specific resolutions for this.

Yes, that's CannotReproduce.

I think the walking-dead JIRAs we have in mind are some combination
of: a JIRA opened without a lot of detail, that might or might not be
a problem, nobody else seemed to have the problem and/or nobody cared
to investigate, much has changed since anyway so might be obsolete.
WontFix, CannotReproduce, NotAProblem are all possibly reasonable
resolutions. If this is just about semantics, I also don't feel a
strong need for a new state.


 On a more general note: what is the problem with open issues / pull requests?
 I see a tendency in the Spark project to do unusual things with issues / PRs
 just to maintain the numbers low. For example, closing PRs after a couple of
 weeks of inactivity just to shrink the queue or closing active issues just 
 for the
 shake of it.

 Honestly, this looks a lot like trying to game metrics. But maybe there is
 something that I am missing.

Game for whose benefit? nobody is being evaluated on this stuff. This
is being proposed for real reasons, not for fun.

A bunch of JIRA cruft is a symptom, not a cause. Something is wrong
somewhere if people file JIRAs and they go nowhere. Everyone's time is
wasted and with no conclusion, there's no feedback or learning
anywhere. So it keeps happening. Is it bad JIRAs? scope issues? lack
of follow up from developer or contributor? all of the above?

I actually think it's mostly bad JIRAs: too large, too invasive, not
that useful, hacky fixes to a facet of a problem, incomplete
description, duplicate, etc.

I think it's more useful to actually close these to communicate back
clearly what is not going to be accepted. Things can be reopened if
needed. Silently ignoring them forever as an Open JIRA seems less
constructive.


 Maybe what it is actually needed is to improve the lifecycle of an issue while
 it is alive, instead of trying to kill it earlier. Some examples of this that 
 are
 used on other projects are the incomplete status to signal that there is 
 more
 info required from the reporter in order to take further action. Also 
 confirmed
 to acknowledge that a bug is confirmed to be present and needs action by
 a developer.

Yes, best to try to make the process better. That's why I started with
things like a more comprehensive
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark
to make better contributions in the first place. By the time dead
JIRAs are closed, something's already gone wrong and time has been
wasted. But we still need that culture of not letting stuff sit
around.

I don't mind the idea of an Unresolved InformationNeeded status,
yeah. I don't actually think that would solve a problem though. The
dead JIRAs are ones that never got any follow up, or, that got a lot
of follow-up from the contributor but aren't going to be merged.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Santiago Mola
2015-05-12 9:50 GMT+02:00 Patrick Wendell pwend...@gmail.com:


 Inactive - A feature or bug that has had no activity from users or
 developers in a long time


Why is this needed? Every JIRA listing can be sorted by activity. That gets
the inactive ones out of your view quickly. I do not see any reason why an
issue should be closed because of this. If it's inactive, maybe it's because
it falls on some of the other categories (out of scope, later, won't fix).

I can only think about a case where closing an inactive issue makes sense:

*  A bug was reported long time ago. Nobody was able to reproduce (after
   trying actively) and the reporter is no longer around to provide more
info.

That is a much more specific case than Inactivity, and a lot of large
scale
open source projects use specific resolutions for this.

On a more general note: what is the problem with open issues / pull
requests?
I see a tendency in the Spark project to do unusual things with issues / PRs
just to maintain the numbers low. For example, closing PRs after a couple of
weeks of inactivity just to shrink the queue or closing active issues just
for the
shake of it.

Honestly, this looks a lot like trying to game metrics. But maybe there is
something that I am missing.

Maybe what it is actually needed is to improve the lifecycle of an issue
while
it is alive, instead of trying to kill it earlier. Some examples of this
that are
used on other projects are the incomplete status to signal that there is
more
info required from the reporter in order to take further action. Also
confirmed
to acknowledge that a bug is confirmed to be present and needs action by
a developer.

Best,
-- 

Santiago M. Mola


http://www.stratio.com/
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
https://twitter.com/StratioBD*


Re: Adding/Using More Resolution Types on JIRA

2015-05-21 Thread Sean Owen
On Thu, May 21, 2015 at 10:03 PM, Santiago Mola sm...@stratio.com wrote:
 Sure. That is why I was talking about the Inactive resolution specifically. 
 The
 combination of Priority + other statuses are enough to solve these issues. A
 minor/trivial issue that is incomplete is probably not going to hurt much to
 someone looking for critical open issues.

If you mean you intend to consider them resolved, then yeah we agree a
lot. The names of the resolved states don't matter nearly so much to
me. For instance, in your examples, I could call those CannotReproduce
or Incomplete. I would not want to leave them Open in that state
indefinitely.

 On a side-note, I would like to contribute some time on improving this. When
 identifying this kind of issue, should I ask in the issue itself to resolve 
 it in a
 specific way?

I think reviewing JIRAs actually contributes to a better overall
process, so I'd just dive in. Anything to advance a JIRA / PR to a
resolution is very helpful. Ask for more info, investigate a problem
to confirm it or fail to reproduce, propose a fix, identify
duplicates, flag JIRAs for closing, review changes, say you think the
change is good, etc. -- among the most helpful things anyone can do
right now IMHO.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Contribute code to MLlib

2015-05-21 Thread Trevor Grant
Thank you Ram and Joseph.

I am also hoping to contribute to MLib once my Scala gets up to snuff, this
is the guidance I needed for how to proceed when ready.

Best wishes,
Trevor



On Wed, May 20, 2015 at 1:55 PM, Joseph Bradley jos...@databricks.com
wrote:

 Hi Trevor,

 I may be repeating what Ram said, but to 2nd it, a few points:

 We do want MLlib to become an extensive and rich ML library; as you said,
 scikit-learn is a great example.  To make that happen, we of course need to
 include important algorithms.  Important is hazy, but roughly means being
 useful to a large number of users, improving a large number of use cases
 (above what is currently available), and being well-established and tested.

 Others and I may not be familiar with Tarek's algorithm (since it is so
 new), so it will be important to discuss details on JIRA to establish the
 cases in which the algorithm improves over current PCA.  That may require
 discussion, community testing, etc.  If we establish that it is a clear
 improvement in a large domain, then it could be valuable to have in MLlib
 proper.  It's always going to be hard to tell where to draw the line, so
 less common algorithms will require more testing before we commit to
 including them in MLlib.

 I like the Spark package suggestion since it would allow users immediately
 start using the code, while the discussion on JIRA happens.  (Plus, if
 package users find it useful, they can report that on the JIRA.)

 Joseph

 On Wed, May 20, 2015 at 10:01 AM, Ram Sriharsha sriharsha@gmail.com
 wrote:

 Hi Trevor

 I'm attaching the MLLib contribution guideline here:

 https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines

 It speaks to widely known and accepted algorithms but not to whether an
 algorithm has to be better than another in every scenario etc

 I think the guideline explains what a good contribution to the core
 library should look like better than I initially attempted to !

 Sent from my iPhone

 On May 20, 2015, at 9:31 AM, Ram Sriharsha sriharsha@gmail.com
 wrote:

 Hi Trevor

 Good point, I didn't mean that some algorithm has to be clearly better
 than another in every scenario to be included in MLLib. However, even if
 someone is willing to be the maintainer of a piece of code, it does not
 make sense to accept every possible algorithm into the core library.

 That said, the specific algorithms should be discussed in the JIRA: as
 you point out, there is no clear way to decide what algorithm to include
 and what not to, and usually mature algorithms that serve a wide variety of
 scenarios are easier to argue about but nothing prevents anyone from
 opening a ticket to discuss any specific machine learning algorithm.

 My suggestion was simply that for purposes of making experimental or
 newer algorithms available to Spark users, it doesn't necessarily have to
 be in the core library. Spark packages are good enough in this respect.

 Isn't it better for newer algorithms to take this route and prove
 themselves before we bring them into the core library? Especially given the
 barrier to using spark packages is very low.

 Ram



 On Wed, May 20, 2015 at 9:05 AM, Trevor Grant trevor.d.gr...@gmail.com
 wrote:

 Hey Ram,

 I'm not speaking to Tarek's package specifically but to the spirit of
 MLib.  There are a number of method/algorithms for PCA, I'm not sure by
 what criterion the current one is considered 'standard'.

 It is rare to find ANY machine learning algo that is 'clearly better'
 than any other.  They are all tools, they have their place and time.  I
 agree that it makes sense to field new algorithms as packages and then
 integrate into MLib once they are 'proven' (in terms of
 stability/performance/anyone cares).  That being said, if MLib takes the
 stance that 'what we have is good enough unless something is *clearly*
 better', then it will never grow into a suite with the depth and richness
 of sklearn. From a practitioner's stand point, its nice to have everything
 I could ever want ready in an 'off-the-shelf' form.

 'A large number of use cases better than existing' shouldn't be a
 criteria when selecting what to include in MLib.  The important question
 should be, 'Are you willing to take on responsibility for maintaining this
 because you may be the only person on earth who understands the mechanics
 AND how to code it?'.   Obviously we don't want any random junk algo
 included.  But trying to say, 'this way of doing PCA is better than that
 way in a large class of cases' is like trying to say 'geometry is more
 important than calculus in large class of cases, maybe its true- but
 geometry won't help you if you are in a case where you need calculus.

 This all relies on the assumption that MLib is destined to be a rich
 data science/machine learning package.  It may be that the goal is to make
 the project as lightweight and parsimonious as possible, if so 

Re: Resource usage of a spark application

2015-05-21 Thread Akhil Das
Yes Peter that's correct, you need to identify the processes and with that
you can pull the actual usage metrics.

Thanks
Best Regards

On Thu, May 21, 2015 at 2:52 PM, Peter Prettenhofer 
peter.prettenho...@gmail.com wrote:

 Thanks Akhil, Ryan!

 @Akhil: YARN can only tell me how much vcores my app has been granted but
 not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need
 to map JVM executor processes to the context they belong to, right?

 @Ryan: what a great blog post -- this is super relevant for me to analyze
 the state of the cluster as a whole. However, it seems to me that those
 metrics are mostly reported globally and not per spark application.

 2015-05-19 21:43 GMT+02:00 Ryan Williams ryan.blake.willi...@gmail.com:

 Hi Peter, a few months ago I was using MetricsSystem to export to
 Graphite and then view in Grafana; relevant scripts and some
 instructions are here
 https://github.com/hammerlab/grafana-spark-dashboards/ if you want to
 take a look.


 On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer 
 peter.prettenho...@gmail.com wrote:

 Hi all,

 I'm looking for a way to measure the current memory / cpu usage of a
 spark application to provide users feedback how much resources are actually
 being used.
 It seems that the metric system provides this information to some
 extend. It logs metrics on application level (nr of cores granted) and on
 the JVM level (memory usage).
 Is this the recommended way to gather this kind of information? If so,
 how do i best map a spark application to the corresponding JVM processes?

 If not, should i rather request this information from the resource
 manager (e.g. Mesos/YARN)?

 thanks,
  Peter

 --
 Peter Prettenhofer




 --
 Peter Prettenhofer



Re:Re: Low throughput and effect of GC in SparkSql GROUP BY

2015-05-21 Thread zhangxiongfei
Hi Pramod


 Is your data compressed? I encountered similar problem,however, after turned 
codegen on, the GC time was still very long.The size of  input data for my map 
task is about 100M lzo file.
My query is select ip, count(*) as c from stage_bitauto_adclick_d group by ip 
sort by c limit 100


Thanks
Zhang Xiongfei




At 2015-05-21 12:18:35, Reynold Xin r...@databricks.com wrote:

Does this turn codegen on? I think the performance is fairly different when 
codegen is turned on.


For 1.5, we are investigating having codegen on by default, so users get much 
better performance out of the box.




On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com 
wrote:

Hi,
Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a 
data point regarding the performance of Group By, indicating there's excessive 
GC and it's impacting the throughput. I want to know if the new memory manager 
for aggregations (https://github.com/apache/spark/pull/5725/) is going to 
address this kind of issue.


I only have a small amount of data on each node (~360MB) with a large heap size 
(18 Gig). I still see 2-3 minor collections happening whenever I do a Select 
Sum() with a group by(). I have tried with different sizes for Young Generation 
without much effect, though not with different GC algorithms (Hm..I ought to 
try reducing the rdd storage fraction perhaps).


I have made a chart of my results [1] by adding timing code to 
Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab 
benchmark, running over 10 million records. The chart is from one of the 4 
worker nodes in the cluster.


I am trying to square this with a claim on the Project Tungsten blog post [2]: 
When profiling Spark user applications, we’ve found that a large fraction of 
the CPU time is spent waiting for data to be fetched from main memory. 


Am I correct in assuming that SparkSql is yet to reach that level of 
efficiency, at least in aggregation operations?



Thanks.


[1] - 
https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174
[2] 
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html


Pramod



Re: Resource usage of a spark application

2015-05-21 Thread Ryan Williams
On Thu, May 21, 2015 at 5:22 AM Peter Prettenhofer 
peter.prettenho...@gmail.com wrote:

 Thanks Akhil, Ryan!

 @Akhil: YARN can only tell me how much vcores my app has been granted but
 not actual cpu usage, right? Pulling mem/cpu usage from the OS means i need
 to map JVM executor processes to the context they belong to, right?

 @Ryan: what a great blog post -- this is super relevant for me to analyze
 the state of the cluster as a whole. However, it seems to me that those
 metrics are mostly reported globally and not per spark application.


Thanks! You can definitely analyze metrics per-application in several ways:

   - If you're running Spark on YARN, use the app URL param
   https://github.com/hammerlab/grafana-spark-dashboards#appyarn-app-id
   to specify a YARN application ID, which will set the Spark application ID
   as well as parse job start/end times.
   - Set the prefix URL param
   https://github.com/hammerlab/grafana-spark-dashboards#prefixmetric-prefix
   to your Spark app's ID, and all metrics will be namespaced to that app ID.
  - You actually have to do one of these two, otherwise it doesn't know
  what app's metrics to look for; it is set up specifically to view per-app
  metrics.
   - There is a dropdown in the upper-left of the page (sorry, don't have a
   screenshot right now) that will let you select from all app IDs that
   graphite has seen metrics from.

Let me know, here or in issues on the repo, if you have any issues with
that or that doesn't make sense!



 2015-05-19 21:43 GMT+02:00 Ryan Williams ryan.blake.willi...@gmail.com:

 Hi Peter, a few months ago I was using MetricsSystem to export to
 Graphite and then view in Grafana; relevant scripts and some
 instructions are here
 https://github.com/hammerlab/grafana-spark-dashboards/ if you want to
 take a look.


 On Sun, May 17, 2015 at 8:48 AM Peter Prettenhofer 
 peter.prettenho...@gmail.com wrote:

 Hi all,

 I'm looking for a way to measure the current memory / cpu usage of a
 spark application to provide users feedback how much resources are actually
 being used.
 It seems that the metric system provides this information to some
 extend. It logs metrics on application level (nr of cores granted) and on
 the JVM level (memory usage).
 Is this the recommended way to gather this kind of information? If so,
 how do i best map a spark application to the corresponding JVM processes?

 If not, should i rather request this information from the resource
 manager (e.g. Mesos/YARN)?

 thanks,
  Peter

 --
 Peter Prettenhofer




 --
 Peter Prettenhofer



Why use lib_managed for the Sbt build?

2015-05-21 Thread Iulian Dragoș
I’m trying to understand why Sbt is configured to pull all libs under
lib_managed.

   - it seems like unnecessary duplication (I will have those libraries
   under ./m2, via maven anyway)
   - every time I call make-distribution I lose lib_managed (via mvn clean
   install) and have to wait to download again all jars next time I use sbt
   - Eclipse does not handle relative paths very well (source attachments
   from lib_managed don’t always work)

So, what is the advantage of putting all dependencies in there, instead of
using the default `~/.ivy2`?

cheers,
iulian
​
-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Kevin Markey

  
  


  This is an excellent discussion.  As mentioned in an earlier
  email, we agree with a number of Chester's suggestions, but we
  have yet other concerns.  I've researched this further in the past
  several days, and I've queried my team.  This email attempts to
  capture those other concerns.

Making yarn.Client private has prevented us from
  moving from
  Spark 1.0.x to Spark 1.2 or 1.3 despite many alluring new
  features. The SparkLauncher, which provides “support for
  programmatically running Spark jobs” (SPARK-3733 and SPARK-4924)
  will not work in our environment or for our use case -- which
  requires programmatically initiating and monitoring Spark jobs on
  Yarn in cluster mode from a cloud-based application server.

It is not just
  that
  the Yarn ApplicationId is no longer directly or indirectly
  available. More critically, it violates constraints imposed by any
  application server and additional constraints imposed by security,
  process, and dynamic resource allocation requirements in our cloud
  services environment. 
In Spark 1.0 and
  1.1, with yarn.Client public, our applications' job
scheduler marshalls configuration and environmental
  resources
  necessary for any Spark job, including cluster-, data- or
  job-specific parameters, makes the appropriate calls to initialize
  and run yarn.Client, which together with the other classes
  in
  the spark-yarn module requests the Yarn resource manager to start
  and
  monitor a job (see Figure 1) on the cluster. (Our job scheduler is
  not Yarn replacement; it leverages Yarn to coordinate a variety of
  different Spark analytic and data enrichment jobs.)
More recent Spark
  versions make yarn.Client private and thus remove
  that
  capability, but the SparkLauncher,
  scheduled for Spark 1.4, replaces this simple programmatic
  solution
  with one considerably more complicated. Based on our
  understanding,
  in this scenario, our job scheduler marshalls configuration and
  environmental resources for the SparkLauncher much as it
  did
  for yarn.Client. It then calls launch() to
  initialize
  a new Linux process to execute the spark-submit shell
  script
  with the specified configuration and environment, which in turn
  starts a new JVM (with the Spark assembly jar in its class path)
  that
  executes launcher.Main. This
  ultimately calls yarn.Client (see Figure 2). This
  is
  more than an arm's-length transaction. There are three legs: job
  scheduler SparkLauncher.launch() call → spark-submit
  bash
  execution → launcher.Main call to yarn.Client →
  Yarn resource manager allocation and execution of job driver and
  executors.
Not only is this
  scenario unnecessarily complicated, it will simply not work. The
  “programmatic” call to SparkLauncher.launch() starts a new
JVM, which is not allowed in any application server, which
  must
  own all its JVMs. Perhaps, spark-submit and the launcher.Main
  JVM process could be hosted outside the application server,
  but
  in violation of security and multiple-tenant cloud architectural
  constraints. 
We appreciate that
  yarn.Client was perhaps never intended to be public. Configuring
  it
  is not for the faint-of-heart, and some of its methods should
  indeed
  be private. We wonder whether there is another option.
In researching and
  discussing these issues with Cloudera and others, we've been told
  that only one mechanism is supported for starting Spark jobs: the
  spark-submit scripts. We also have gathered (perhaps
  mistakenly) from discussions
  reaching back 20 months that Spark's intention is to have a
  unified
  job submission interface for all supported platforms.
  Unfortunately
  this doesn't recognize the asymmetries among those platforms.
  Submitting a local Spark job or a job to a Spark master in cluster
  mode may indeed require initializing a separate process in order
  to
  pass configuration parameters via the environment and command
  line. But Spark's yarn.Client in cluster mode already has
  an arm's
  length relationship with the Yarn resource manager. Configuration
  may be passed from the job scheduling application to yarn.Client
  as Strings or property map variables and method parameters. 
Our request is for
  a public yarn.Client
  or some reasonable facsimile.
Thanks.






On 05/13/2015 08:22 PM, Patrick Wendell
  wrote:


  Hey Chester,

Thanks for sending this. It's very helpful to have this list.

The reason we made the Client API private was that it was never
intended to be used by 

Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Marcelo Vanzin
Hi Kevin,

I read through your e-mail and I see two main things you're talking about.

- You want a public YARN Client class and don't really care about
anything else.

In you message you already mention why that's not a good idea. It's much
better to have a standardized submission API. As you noticed by working
with the previous Client API, it's not for the faint of heart; SparkSubmit
hides a lot of the complexity, and does so in a way that is transparent to
the caller. Whether you're submitting a Scala, Python, or R app against
standlone, yarn, mesos, or local, the interface is the same.

You may argue that for your particular use case you don't care about
anything other than Scala apps on YARN cluster mode, but Spark does need to
care about more than that.

I still think that once we have a way to expose more information about the
application being launched (more specifically, the app id), then doing
anything else you may want to do that is specific to YARN is up to you and
pretty easy to do. But I strongly believe that having different ways to
launch apps in Spark is not good design.


- You have some restriction that you app servers cannot fork processes

Honestly, I didn't really understand what that is about. Why can't you fork
processes? Is it a restriction regarding what you can deploy on the server
(e.g. you cannot have a full Spark installation, everything needs to be
contained in a jar that is deployed in the app server)?

I really don't believe this is about the inability to fork a process, so it
must be something else.

The unfortunate reality is that Spark is really not friendly multiple
things being launched from the same JVM. Doing that is prone to apps
running all over each other and overwriting configs and other things, which
would lead to many, many tears. Once the limitations around that are fixed,
then we can study adding a way to launch multiple Spark apps from the same
JVM, but right now that's just asking for (hard to debug) trouble.

It might be possible to add support for launching subprocesses without
having to invoke the shell scripts; that would have limitations (e.g. no
spark-env.sh support). In fact I did something like that in the first
implementation of the launcher library, but was asked to go through the
shell scripts during code review. (I even had a different method that
launched in the same VM, but that one suffered from all the problems
described in the paragraph above.)


On Thu, May 21, 2015 at 5:21 PM, Kevin Markey kevin.mar...@oracle.com
wrote:

  This is an excellent discussion.  As mentioned in an earlier email, we
 agree with a number of Chester's suggestions, but we have yet other
 concerns.  I've researched this further in the past several days, and I've
 queried my team.  This email attempts to capture those other concerns.

 Making *yarn.Client* private has prevented us from moving from Spark
 1.0.x to Spark 1.2 or 1.3 despite many alluring new features. The
 SparkLauncher, which provides “support for programmatically running Spark
 jobs” (SPARK-3733 and SPARK-4924) will not work in our environment or for
 our use case -- which requires programmatically initiating and monitoring
 Spark jobs on Yarn in cluster mode *from a cloud-based application server*.


 It is not just that the Yarn *ApplicationId* is no longer directly or
 indirectly available. More critically, it violates constraints imposed by
 any application server and additional constraints imposed by security,
 process, and dynamic resource allocation requirements in our cloud services
 environment.

 In Spark 1.0 and 1.1, with *yarn.Client* *public*, our applications' *job
 scheduler* marshalls configuration and environmental resources necessary
 for any Spark job, including cluster-, data- or job-specific parameters,
 makes the appropriate calls to initialize and run *yarn.Client*, which
 together with the other classes in the spark-yarn module requests the Yarn
 resource manager to start and monitor a job (see Figure 1) on the cluster.
 (Our job scheduler is not Yarn replacement; it leverages Yarn to coordinate
 a variety of different Spark analytic and data enrichment jobs.)

 More recent Spark versions make *yarn.Client* *private* and thus remove
 that capability, but the *SparkLauncher*, scheduled for Spark 1.4,
 replaces this simple programmatic solution with one considerably more
 complicated. Based on our understanding, in this scenario, our job
 scheduler marshalls configuration and environmental resources for the 
 *SparkLauncher
 *much as it did for *yarn.Client*. It then calls *launch() *to initialize
 a new Linux process to execute the *spark-submit *shell script with the
 specified configuration and environment, which in turn starts a new JVM
 (with the Spark assembly jar in its class path) that executes *launcher.Main.
 *This ultimately calls *yarn.Client* (see Figure 2). This is more than an
 arm's-length transaction. There are three legs: job scheduler 
 *SparkLauncher.launch()

Re: Re: Low throughput and effect of GC in SparkSql GROUP BY

2015-05-21 Thread Pramod Biligiri
Hi Zhang,
No my data is not compressed. I'm trying to minimize the load on the CPU.
The GC time reduced for me after codegen.

Pramod

On Thu, May 21, 2015 at 3:43 AM, zhangxiongfei zhangxiongfei0...@163.com
wrote:

 Hi Pramod

  Is your data compressed? I encountered similar problem,however, after
 turned codegen on, the GC time was still very long.The size of  input data
 for my map task is about 100M lzo file.
 My query is select ip, count(*) as c from stage_bitauto_adclick_d group
 by ip sort by c limit 100

 Thanks
 Zhang Xiongfei



 At 2015-05-21 12:18:35, Reynold Xin r...@databricks.com wrote:

 Does this turn codegen on? I think the performance is fairly different
 when codegen is turned on.

 For 1.5, we are investigating having codegen on by default, so users get
 much better performance out of the box.


 On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com
  wrote:

 Hi,
 Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have
 a data point regarding the performance of Group By, indicating there's
 excessive GC and it's impacting the throughput. I want to know if the new
 memory manager for aggregations (
 https://github.com/apache/spark/pull/5725/) is going to address this
 kind of issue.

 I only have a small amount of data on each node (~360MB) with a large
 heap size (18 Gig). I still see 2-3 minor collections happening whenever I
 do a Select Sum() with a group by(). I have tried with different sizes for
 Young Generation without much effect, though not with different GC
 algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps).

 I have made a chart of my results [1] by adding timing code to
 Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab
 benchmark, running over 10 million records. The chart is from one of the 4
 worker nodes in the cluster.

 I am trying to square this with a claim on the Project Tungsten blog post
 [2]: When profiling Spark user applications, we’ve found that a large
 fraction of the CPU time is spent waiting for data to be fetched from main
 memory. 

 Am I correct in assuming that SparkSql is yet to reach that level of
 efficiency, at least in aggregation operations?

 Thanks.

 [1] -
 https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174
 [2]
 https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

 Pramod







Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Nathan Kronenfeld
 In researching and discussing these issues with Cloudera and others, we've
 been told that only one mechanism is supported for starting Spark jobs: the
 *spark-submit* scripts.


Is this new? We've been submitting jobs directly from a programatically
created spark context (instead of through spark-submit) from the beginning
(from 0.7.x to 1.2) - to a local cluster.

In moving to 1.3 on Yarn cluster recently, we've had no end of problems
trying to switch this over (though I think we're almost there).

Why would one want to eliminate this possibility?

 -Nathan


Testing spark applications

2015-05-21 Thread Nathan Kronenfeld

 see discussions about Spark not really liking multiple contexts in the
 same JVM


Speaking of this - is there a standard way of writing unit tests that
require a SparkContext?

We've ended up copying out the code of SharedSparkContext to our own
testing hierarchy, but it occurs to me someone would have published a test
jar by now if that was the best way.

  -Nathan


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Marcelo Vanzin
Hi Nathan,

On Thu, May 21, 2015 at 7:30 PM, Nathan Kronenfeld 
nkronenfeld@uncharted.software wrote:



 In researching and discussing these issues with Cloudera and others,
 we've been told that only one mechanism is supported for starting Spark
 jobs: the *spark-submit* scripts.


 Is this new? We've been submitting jobs directly from a programatically
 created spark context (instead of through spark-submit) from the beginning
 (from 0.7.x to 1.2) - to a local cluster.

 In moving to 1.3 on Yarn cluster recently, we've had no end of problems
 trying to switch this over (though I think we're almost there).


Instantiating SparkContext directly works. Well, sorta: it has limitations.
For example, see discussions about Spark not really liking multiple
contexts in the same JVM. It also does not work in cluster deploy mode.

-- 
Marcelo


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Nathan Kronenfeld
 Thanks, Marcelo


 Instantiating SparkContext directly works. Well, sorta: it has
 limitations. For example, see discussions about Spark not really liking
 multiple contexts in the same JVM. It also does not work in cluster
 deploy mode.

 That's fine - when one is doing something out of standard, one expects a
bit of Caveat Emptor.


Re: Change for submitting to yarn in 1.3.1

2015-05-21 Thread Koert Kuipers
we also launch jobs programmatically, both on standalone mode and
yarn-client mode. in standalone mode it always worked, in yarn-client mode
we ran into some issues and were forced to use spark-submit, but i still
have on my todo list to move back to a normal java launch without
spark-submit at some point.
for me spark is a library that i use to do distributed computations within
my app, and ideally a  library should not tell me how to launch my app. i
mean, if multiple libraries that i use all had their own launch script i
would get stuck very quickly hadoop jar vs spark-submit vs kiji launch
vs hbase jar bad idea, i think!

however i do understand the practical reasons why spark-submit can about...


On Thu, May 21, 2015 at 10:30 PM, Nathan Kronenfeld 
nkronenfeld@uncharted.software wrote:



 In researching and discussing these issues with Cloudera and others,
 we've been told that only one mechanism is supported for starting Spark
 jobs: the *spark-submit* scripts.


 Is this new? We've been submitting jobs directly from a programatically
 created spark context (instead of through spark-submit) from the beginning
 (from 0.7.x to 1.2) - to a local cluster.

 In moving to 1.3 on Yarn cluster recently, we've had no end of problems
 trying to switch this over (though I think we're almost there).

 Why would one want to eliminate this possibility?

  -Nathan




Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Dibyendu Bhattacharya
Hi Tathagata,

Thanks for looking into this. Further investigating I found that the issue
is with Tachyon does not support File Append. The streaming receiver which
writes to WAL when failed, and again restarted, not able to append to same
WAL file after restart.

I raised this with Tachyon user group, and Haoyuan told that within 3
months time Tachyon file append will be ready. Will revisit this issue
again then .

Regards,
Dibyendu


On Fri, May 22, 2015 at 12:24 AM, Tathagata Das t...@databricks.com wrote:

 Looks like somehow the file size reported by the FSInputDStream of
 Tachyon's FileSystem interface, is returning zero.

 On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon
 as OFF_HEAP block store. As I said in earlier email, I could able to solve
 the BlockNotFound exception when I used Hierarchical Storage of Tachyon
 ,  which is good.

 I continue doing some testing around storing the Spark Streaming WAL and
 CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon receivedData location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I raised
 a JIRA for the same issue :
 https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
 (epoch 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
 successfully in removeExecutor
 INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
 receiver for stream 2 from 10.252.5.62*:47255
 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
 not read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)*
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IllegalArgumentException:* Seek position is past
 EOF: 645603894, fileSize = 0*
 at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
 at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
 ... 15 more

 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
 stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
 INFO : 

Re: Testing spark applications

2015-05-21 Thread Reynold Xin
It is just 15 lines of code to copy, isn't it?

On Thu, May 21, 2015 at 7:46 PM, Nathan Kronenfeld 
nkronenfeld@uncharted.software wrote:

 see discussions about Spark not really liking multiple contexts in the
 same JVM


 Speaking of this - is there a standard way of writing unit tests that
 require a SparkContext?

 We've ended up copying out the code of SharedSparkContext to our own
 testing hierarchy, but it occurs to me someone would have published a test
 jar by now if that was the best way.

   -Nathan




Tungsten's Vectorized Execution

2015-05-21 Thread Yijie Shen
Hi all,

I’ve seen the Blog of Project Tungsten here, it sounds awesome to me!

I’ve also noticed there is a plan to change the code generation from 
record-at-a-time evaluation to a vectorized one, which interests me most.

What’s the status of vectorized evaluation?  Is this an inner effort of 
Databricks or welcome to be involved?   

Since I’ve done similar stuffs on Spark SQL, I would like to get involved if 
that’s possible.



Yours,

Yijie

Customizing Akka configuration for Spark

2015-05-21 Thread Akshat Aranya
Hi,

Is there some way to customize the Akka configuration for Spark?
Specifically, I want to experiment with custom serialization for messages
that are send between the driver and executors in standalone mode.

Thanks,
Akshat