Re: Spark UI Source Code

2018-05-08 Thread Anshi Shrivastava
Hi Marcelo, Dev,

Thanks for your response.
I have used SparkListeners to fetch the metrics (the public REST API uses
the same) but to monitor these metrics over time, I have to persist them
(using KVStore library of spark).  Is there a way to fetch data from this
KVStore (which uses levelDb for storage) and filter it on basis on
timestamp?

Thanks,
Anshi

On Mon, May 7, 2018 at 9:51 PM, Marcelo Vanzin [via Apache Spark User List]
 wrote:

> On Mon, May 7, 2018 at 1:44 AM, Anshi Shrivastava
> <[hidden email] >
> wrote:
> > I've found a KVStore wrapper which stores all the metrics in a LevelDb
> > store. This KVStore wrapper is available as a spark-dependency but we
> cannot
> > access the metrics directly from spark since they are all private.
>
> I'm not sure what it is you're trying to do exactly, but there's a
> public REST API that exposes all the data Spark keeps about
> applications. There's also a programmatic status tracker
> (SparkContext.statusTracker) that's easier to use from within the
> running Spark app, but has a lot less info.
>
> > Can we use this store to store our own metrics?
>
> No.
>
> > Also can we retrieve these metrics based on timestamp?
>
> Only if the REST API has that feature, don't remember off the top of my
> head.
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: [hidden email]
> 
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Re-
> Spark-UI-Source-Code-tp32114.html
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>

-- 
*
*
*DISCLAIMER:*
All the content in email is intended for the recipient 
and not to be published elsewhere without Exadatum consent. And attachments 
shall be send only if required and with ownership of the sender. This 
message contains confidential information and is intended only for the 
individual named. If you are not the named addressee, you should not 
disseminate, distribute or copy this email. Please notify the sender 
immediately by email if you have received this email by mistake and delete 
this email from your system. Email transmission cannot be guaranteed to be 
secure or error-free, as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses. The sender, 
therefore, does not accept liability for any errors or omissions in the 
contents of this message which arise as a result of email transmission. If 
verification is required, please request a hard-copy version.


Re: Documenting the various DataFrame/SQL join types

2018-05-08 Thread Nicholas Chammas
OK great, I’m happy to take this on.

Does it make sense to approach this by adding an example for each join type
here

(and perhaps also in the matching areas for Scala, Java, and R), and then
referencing the examples from the SQL Programming Guide

using include_example tags?

e.g.:


{% include_example write_sorting_and_bucketing python/sql/datasource.py %}

And would this let me implement simple tests for the examples? It’s not
clear to me whether the comment blocks in that example file are used for
testing somehow.

Just looking for some high level guidance.

Nick
​

On Tue, May 8, 2018 at 11:42 AM Reynold Xin  wrote:

> Would be great to document. Probably best with examples.
>
> On Tue, May 8, 2018 at 6:13 AM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> The documentation for DataFrame.join()
>> 
>> lists all the join types we support:
>>
>>- inner
>>- cross
>>- outer
>>- full
>>- full_outer
>>- left
>>- left_outer
>>- right
>>- right_outer
>>- left_semi
>>- left_anti
>>
>> Some of these join types are also listed on the SQL Programming Guide
>> 
>> .
>>
>> Is it obvious to everyone what all these different join types are? For
>> example, I had never heard of a LEFT ANTI join until stumbling on it in the
>> PySpark docs. It’s quite handy! But I had to experiment with it a bit just
>> to understand what it does.
>>
>> I think it would be a good service to our users if we either documented
>> these join types ourselves clearly, or provided a link to an external
>> resource that documented them sufficiently. I’m happy to file a JIRA about
>> this and do the work itself. It would be great if the documentation could
>> be expressed as a series of simple doc tests, but brief prose describing
>> how each join works would still be valuable.
>>
>> Does this seem worthwhile to folks here? And does anyone want to offer
>> guidance on how best to provide this kind of documentation so that it’s
>> easy to find by users, regardless of the language they’re using?
>>
>> Nick
>> ​
>>
>


Re: Identifying specific persisted DataFrames via getPersistentRDDs()

2018-05-08 Thread Nicholas Chammas
That’s correct. I probably would have done better to title this thread
something like “How to effectively track and release persisted DataFrames”.

I jumped the gun in my initial email by referencing getPersistentRDDs() as
a potential solution, but in theory the desired API is something like
spark.unpersistAllExcept([list
of DataFrames or RDDs]). This seems awkward, but I suspect the underlying
use case is common.

An alternative or complementary approach, perhaps, would be to allow
persistence (and perhaps even checkpointing) to be explicitly scoped
. I think in some
circles this is called “Scope-based Resource Management” or “Resource
acquisition is initialization” (RAII). It would make it a lot easier to
track and release DataFrames or RDDs when they are no longer needed in
cache.

Nick

2018년 5월 8일 (화) 오후 1:32, Mark Hamstra 님이 작성:

If I am understanding you correctly, you're just saying that the problem is
> that you know what you want to keep, not what you want to throw away, and
> that there is no unpersist DataFrames call based on that what-to-keep
> information.
>
> On Tue, May 8, 2018 at 6:00 AM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> I certainly can, but the problem I’m facing is that of how best to track
>> all the DataFrames I no longer want to persist.
>>
>> I create and persist various DataFrames throughout my pipeline. Spark is
>> already tracking all this for me, and exposing some of that tracking
>> information via getPersistentRDDs(). So when I arrive at a point in my
>> program where I know, “I only need this DataFrame going forward”, I want to
>> be able to tell Spark “Please unpersist everything except this one
>> DataFrame”. If I cannot leverage the information about persisted DataFrames
>> that Spark is already tracking, then the alternative is for me to carefully
>> track and unpersist DataFrames when I no longer need them.
>>
>> I suppose the problem is similar at a high level to garbage collection.
>> Tracking and freeing DataFrames manually is analogous to malloc and free;
>> and full automation would be Spark automatically unpersisting DataFrames
>> when they were no longer referenced or needed. I’m looking for an
>> in-between solution that lets me leverage some of the persistence tracking
>> in Spark so I don’t have to do it all myself.
>>
>> Does this make more sense now, from a use case perspective as well as
>> from a desired API perspective?
>> ​
>>
>> On Thu, May 3, 2018 at 10:26 PM Reynold Xin  wrote:
>>
>>> Why do you need the underlying RDDs? Can't you just unpersist the
>>> dataframes that you don't need?
>>>
>>>
>>> On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 This seems to be an underexposed part of the API. My use case is this:
 I want to unpersist all DataFrames except a specific few. I want to do this
 because I know at a specific point in my pipeline that I have a handful of
 DataFrames that I need, and everything else is no longer needed.

 The problem is that there doesn’t appear to be a way to identify
 specific DataFrames (or rather, their underlying RDDs) via
 getPersistentRDDs(), which is the only way I’m aware of to ask Spark
 for all currently persisted RDDs:

 >>> a = spark.range(10).persist()>>> a.rdd.id()8>>> 
 >>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
 [(3, JavaObject id=o36)]

 As you can see, the id of the persisted RDD, 8, doesn’t match the id
 returned by getPersistentRDDs(), 3. So I can’t go through the RDDs
 returned by getPersistentRDDs() and know which ones I want to keep.

 id() itself appears to be an undocumented method of the RDD API, and
 in PySpark getPersistentRDDs() is buried behind the Java sub-objects
 , so I know I’m
 reaching here. But is there a way to do what I want in PySpark without
 manually tracking everything I’ve persisted myself?

 And more broadly speaking, do we want to add additional APIs, or
 formalize currently undocumented APIs like id(), to make this use case
 possible?

 Nick
 ​

>>>
> ​


Re: eager execution and debuggability

2018-05-08 Thread Reynold Xin
Yes would be great if possible but it’s non trivial (might be impossible to
do in general; we already have stacktraces that point to line numbers when
an error occur in UDFs but clearly that’s not sufficient). Also in
environments like REPL it’s still more useful to show error as soon as it
occurs, rather than showing it potentially 30 lines later.

On Tue, May 8, 2018 at 7:22 PM Nicholas Chammas 
wrote:

> This may be technically impractical, but it would be fantastic if we could
> make it easier to debug Spark programs without needing to rely on eager
> execution. Sprinkling .count() and .checkpoint() at various points in my
> code is still a debugging technique I use, but it always makes me wish
> Spark could point more directly to the offending transformation when
> something goes wrong.
>
> Is it somehow possible to have each individual operator (is that the
> correct term?) in a DAG include metadata pointing back to the line of code
> that generated the operator? That way when an action triggers an error, the
> failing operation can point to the relevant line of code — even if it’s a
> transformation — and not just the action on the tail end that triggered the
> error.
>
> I don’t know how feasible this is, but addressing it would directly solve
> the issue of linking failures to the responsible transformation, as opposed
> to leaving the user to break up a chain of transformations with several
> debug actions. And this would benefit new and experienced users alike.
>
> Nick
>
> 2018년 5월 8일 (화) 오후 7:09, Ryan Blue rb...@netflix.com.invalid
> 님이 작성:
>
> I've opened SPARK-24215 to track this.
>>
>> On Tue, May 8, 2018 at 3:58 PM, Reynold Xin  wrote:
>>
>>> Yup. Sounds great. This is something simple Spark can do and provide
>>> huge value to the end users.
>>>
>>>
>>> On Tue, May 8, 2018 at 3:53 PM Ryan Blue  wrote:
>>>
 Would be great if it is something more turn-key.

 We can easily add the __repr__ and _repr_html_ methods and behavior to
 PySpark classes. We could also add a configuration property to determine
 whether the dataset evaluation is eager or not. That would make it turn-key
 for anyone running PySpark in Jupyter.

 For JVM languages, we could also add a dependency on jvm-repr and do
 the same thing.

 rb
 ​

 On Tue, May 8, 2018 at 3:47 PM, Reynold Xin 
 wrote:

> s/underestimated/overestimated/
>
> On Tue, May 8, 2018 at 3:44 PM Reynold Xin 
> wrote:
>
>> Marco,
>>
>> There is understanding how Spark works, and there is finding bugs
>> early in their own program. One can perfectly understand how Spark works
>> and still find it valuable to get feedback asap, and that's why we built
>> eager analysis in the first place.
>>
>> Also I'm afraid you've significantly underestimated the level of
>> technical sophistication of users. In many cases they struggle to get
>> anything to work, and performance optimization of their programs is
>> secondary to getting things working. As John Ousterhout says, "the 
>> greatest
>> performance improvement of all is when a system goes from not-working to
>> working".
>>
>> I really like Ryan's approach. Would be great if it is something more
>> turn-key.
>>
>>
>>
>>
>>
>>
>> On Tue, May 8, 2018 at 2:35 PM Marco Gaido 
>> wrote:
>>
>>> I am not sure how this is useful. For students, it is important to
>>> understand how Spark works. This can be critical in many decision they 
>>> have
>>> to take (whether and what to cache for instance) in order to have
>>> performant Spark application. Creating a eager execution probably can 
>>> help
>>> them having something running more easily, but let them also using Spark
>>> knowing less about how it works, thus they are likely to write worse
>>> application and to have more problems in debugging any kind of problem
>>> which may later (in production) occur (therefore affecting their 
>>> experience
>>> with the tool).
>>>
>>> Moreover, as Ryan also mentioned, there are tools/ways to force the
>>> execution, helping in the debugging phase. So they can achieve without a
>>> big effort the same result, but with a big difference: they are aware of
>>> what is really happening, which may help them later.
>>>
>>> Thanks,
>>> Marco
>>>
>>> 2018-05-08 21:37 GMT+02:00 Ryan Blue :
>>>
 At Netflix, we use Jupyter notebooks and consoles for interactive
 sessions. For anyone interested, this mode of interaction is really 
 easy to
 add in Jupyter and PySpark. You would just define a different
 *repr_html* or *repr* 

Re: eager execution and debuggability

2018-05-08 Thread Nicholas Chammas
This may be technically impractical, but it would be fantastic if we could
make it easier to debug Spark programs without needing to rely on eager
execution. Sprinkling .count() and .checkpoint() at various points in my
code is still a debugging technique I use, but it always makes me wish
Spark could point more directly to the offending transformation when
something goes wrong.

Is it somehow possible to have each individual operator (is that the
correct term?) in a DAG include metadata pointing back to the line of code
that generated the operator? That way when an action triggers an error, the
failing operation can point to the relevant line of code — even if it’s a
transformation — and not just the action on the tail end that triggered the
error.

I don’t know how feasible this is, but addressing it would directly solve
the issue of linking failures to the responsible transformation, as opposed
to leaving the user to break up a chain of transformations with several
debug actions. And this would benefit new and experienced users alike.

Nick

2018년 5월 8일 (화) 오후 7:09, Ryan Blue rb...@netflix.com.invalid
님이 작성:

I've opened SPARK-24215 to track this.
>
> On Tue, May 8, 2018 at 3:58 PM, Reynold Xin  wrote:
>
>> Yup. Sounds great. This is something simple Spark can do and provide huge
>> value to the end users.
>>
>>
>> On Tue, May 8, 2018 at 3:53 PM Ryan Blue  wrote:
>>
>>> Would be great if it is something more turn-key.
>>>
>>> We can easily add the __repr__ and _repr_html_ methods and behavior to
>>> PySpark classes. We could also add a configuration property to determine
>>> whether the dataset evaluation is eager or not. That would make it turn-key
>>> for anyone running PySpark in Jupyter.
>>>
>>> For JVM languages, we could also add a dependency on jvm-repr and do the
>>> same thing.
>>>
>>> rb
>>> ​
>>>
>>> On Tue, May 8, 2018 at 3:47 PM, Reynold Xin  wrote:
>>>
 s/underestimated/overestimated/

 On Tue, May 8, 2018 at 3:44 PM Reynold Xin  wrote:

> Marco,
>
> There is understanding how Spark works, and there is finding bugs
> early in their own program. One can perfectly understand how Spark works
> and still find it valuable to get feedback asap, and that's why we built
> eager analysis in the first place.
>
> Also I'm afraid you've significantly underestimated the level of
> technical sophistication of users. In many cases they struggle to get
> anything to work, and performance optimization of their programs is
> secondary to getting things working. As John Ousterhout says, "the 
> greatest
> performance improvement of all is when a system goes from not-working to
> working".
>
> I really like Ryan's approach. Would be great if it is something more
> turn-key.
>
>
>
>
>
>
> On Tue, May 8, 2018 at 2:35 PM Marco Gaido 
> wrote:
>
>> I am not sure how this is useful. For students, it is important to
>> understand how Spark works. This can be critical in many decision they 
>> have
>> to take (whether and what to cache for instance) in order to have
>> performant Spark application. Creating a eager execution probably can 
>> help
>> them having something running more easily, but let them also using Spark
>> knowing less about how it works, thus they are likely to write worse
>> application and to have more problems in debugging any kind of problem
>> which may later (in production) occur (therefore affecting their 
>> experience
>> with the tool).
>>
>> Moreover, as Ryan also mentioned, there are tools/ways to force the
>> execution, helping in the debugging phase. So they can achieve without a
>> big effort the same result, but with a big difference: they are aware of
>> what is really happening, which may help them later.
>>
>> Thanks,
>> Marco
>>
>> 2018-05-08 21:37 GMT+02:00 Ryan Blue :
>>
>>> At Netflix, we use Jupyter notebooks and consoles for interactive
>>> sessions. For anyone interested, this mode of interaction is really 
>>> easy to
>>> add in Jupyter and PySpark. You would just define a different
>>> *repr_html* or *repr* method for Dataset that runs a take(10) or
>>> take(100) and formats the result.
>>>
>>> That way, the output of a cell or console execution always causes
>>> the dataframe to run and get displayed for that immediate feedback. But,
>>> there is no change to Spark’s behavior because the action is run by the
>>> REPL, and only when a dataframe is a result of an execution in order to
>>> display it. Intermediate results wouldn’t be run, but that gives users a
>>> way to avoid too many executions and would still support method 
>>> 

Re: [DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

2018-05-08 Thread Ryan Blue
Because the InternalRow representation is already in the v2 public API,
we’re already working on cleaning up classes and documenting the internal
representation. The JIRA issue is SPARK-23657 and my PR is #21242
.

The confusion I want to clear up is what “correct” is. Should we
immediately translate to UnsafeRow before passing data to other operators,
or should we (eventually) fix the operators that don’t support InternalRow?
If the answer is that it is okay to pass InternalRow, then we can get a few
performance gains by avoiding copies in data sources (including v1
sources). Parquet copies every partitioned row twice to ensure it is
UnsafeRow, even though this is the most expensive place to do it if there
are filters or another projection on top of the scan operator.

For the v2 API, I think the right thing to do is accept InternalRow and
handle conversion to unsafe in Spark. That way, we can defer conversion
until after filters have run and only add a conversion if there isn’t
already a projection that will do it.

Here’s the thread on github about this for background:
https://github.com/apache/spark/pull/21118#issuecomment-386647848

rb
​

On Tue, May 8, 2018 at 4:27 PM, Reynold Xin  wrote:

> IIRC we switched all internals to UnsafeRow for simplicity. It is easier
> to serialize UnsafeRows, compute hash codes, etc. At some point we had a
> bug with unioning two plans producing different types of rows, so we forced
> the conversion at input.
>
> Can't your "wish" be satisfied by having the public API producing the
> internals of UnsafeRow (without actually exposing UnsafeRow)?
>
>
> On Tue, May 8, 2018 at 4:16 PM Ryan Blue  wrote:
>
>> Is the goal to design an API so the consumers of the API can directly
>> produces what Spark expects internally, to cut down perf cost?
>>
>> No. That has already been done. The problem on the API side is that it
>> makes little sense to force implementers to create UnsafeRow when it almost
>> certainly causes them to simply use UnsafeProjection and copy it. If
>> we’re just making a copy and we can defer that copy to get better
>> performance, why would we make implementations handle it? Instead, I think
>> we should accept InternalRow from v2 data sources and copy to unsafe
>> when it makes sense to do so: after filters are run and only if there isn’t
>> another projection that will do it already.
>>
>> But I don’t want to focus on the v2 API for this. What I’m asking in this
>> thread is what the intent is for the SQL engine. Is this an accident that
>> nearly everything works with InternalRow? If we were to make a choice
>> here, should we mandate that rows passed into the SQL engine must be
>> UnsafeRow?
>>
>> Personally, I think it makes sense to say that everything should accept
>> InternalRow, but produce UnsafeRow, with the understanding that UnsafeRow
>> will usually perform better.
>>
>> rb
>> ​
>>
>> On Tue, May 8, 2018 at 4:09 PM, Reynold Xin  wrote:
>>
>>> What the internal operators do are strictly internal. To take one step
>>> back, is the goal to design an API so the consumers of the API can directly
>>> produces what Spark expects internally, to cut down perf cost?
>>>
>>>
>>> On Tue, May 8, 2018 at 1:22 PM Ryan Blue 
>>> wrote:
>>>
 While moving the new data source API to InternalRow, I noticed a few
 odd things:

- Spark scans always produce UnsafeRow, but that data is passed
around as InternalRow with explicit casts.
- Operators expect InternalRow and nearly all codegen works with
InternalRow (I’ve tested this with quite a few queries.)
- Spark uses unchecked casts

 
from InternalRow to UnsafeRow in places, assuming that data will be
unsafe, even though that isn’t what the type system guarantees.

 To me, it looks like the idea was to code SQL operators to the abstract
 InternalRow so we can swap out the implementation, but ended up with a
 general assumption that rows will always be unsafe. This is the worst of
 both options: we can’t actually rely on everything working with
 InternalRow but code must still use it, until it is inconvenient and
 an unchecked cast gets inserted.

 The main question I want to answer is this: *what data format should
 SQL use internally?* What was the intent when building catalyst?

 The v2 data source API depends on the answer, but I also found that
 this introduces a significant performance penalty in Parquet (and probably
 other formats). A quick check on one of our tables showed a 6% performance
 hit caused by unnecessary copies from InternalRow to UnsafeRow. So if
 we can guarantee that all operators should support 

Re: [DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

2018-05-08 Thread Reynold Xin
IIRC we switched all internals to UnsafeRow for simplicity. It is easier to
serialize UnsafeRows, compute hash codes, etc. At some point we had a bug
with unioning two plans producing different types of rows, so we forced the
conversion at input.

Can't your "wish" be satisfied by having the public API producing the
internals of UnsafeRow (without actually exposing UnsafeRow)?


On Tue, May 8, 2018 at 4:16 PM Ryan Blue  wrote:

> Is the goal to design an API so the consumers of the API can directly
> produces what Spark expects internally, to cut down perf cost?
>
> No. That has already been done. The problem on the API side is that it
> makes little sense to force implementers to create UnsafeRow when it almost
> certainly causes them to simply use UnsafeProjection and copy it. If
> we’re just making a copy and we can defer that copy to get better
> performance, why would we make implementations handle it? Instead, I think
> we should accept InternalRow from v2 data sources and copy to unsafe when
> it makes sense to do so: after filters are run and only if there isn’t
> another projection that will do it already.
>
> But I don’t want to focus on the v2 API for this. What I’m asking in this
> thread is what the intent is for the SQL engine. Is this an accident that
> nearly everything works with InternalRow? If we were to make a choice
> here, should we mandate that rows passed into the SQL engine must be
> UnsafeRow?
>
> Personally, I think it makes sense to say that everything should accept
> InternalRow, but produce UnsafeRow, with the understanding that UnsafeRow
> will usually perform better.
>
> rb
> ​
>
> On Tue, May 8, 2018 at 4:09 PM, Reynold Xin  wrote:
>
>> What the internal operators do are strictly internal. To take one step
>> back, is the goal to design an API so the consumers of the API can directly
>> produces what Spark expects internally, to cut down perf cost?
>>
>>
>> On Tue, May 8, 2018 at 1:22 PM Ryan Blue 
>> wrote:
>>
>>> While moving the new data source API to InternalRow, I noticed a few odd
>>> things:
>>>
>>>- Spark scans always produce UnsafeRow, but that data is passed
>>>around as InternalRow with explicit casts.
>>>- Operators expect InternalRow and nearly all codegen works with
>>>InternalRow (I’ve tested this with quite a few queries.)
>>>- Spark uses unchecked casts
>>>
>>> 
>>>from InternalRow to UnsafeRow in places, assuming that data will be
>>>unsafe, even though that isn’t what the type system guarantees.
>>>
>>> To me, it looks like the idea was to code SQL operators to the abstract
>>> InternalRow so we can swap out the implementation, but ended up with a
>>> general assumption that rows will always be unsafe. This is the worst of
>>> both options: we can’t actually rely on everything working with
>>> InternalRow but code must still use it, until it is inconvenient and an
>>> unchecked cast gets inserted.
>>>
>>> The main question I want to answer is this: *what data format should
>>> SQL use internally?* What was the intent when building catalyst?
>>>
>>> The v2 data source API depends on the answer, but I also found that this
>>> introduces a significant performance penalty in Parquet (and probably other
>>> formats). A quick check on one of our tables showed a 6% performance hit
>>> caused by unnecessary copies from InternalRow to UnsafeRow. So if we
>>> can guarantee that all operators should support InternalRow, then there
>>> is an easy performance win that also simplifies the v2 data source API.
>>>
>>> rb
>>> ​
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: [DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

2018-05-08 Thread Ryan Blue
Is the goal to design an API so the consumers of the API can directly
produces what Spark expects internally, to cut down perf cost?

No. That has already been done. The problem on the API side is that it
makes little sense to force implementers to create UnsafeRow when it almost
certainly causes them to simply use UnsafeProjection and copy it. If we’re
just making a copy and we can defer that copy to get better performance,
why would we make implementations handle it? Instead, I think we should
accept InternalRow from v2 data sources and copy to unsafe when it makes
sense to do so: after filters are run and only if there isn’t another
projection that will do it already.

But I don’t want to focus on the v2 API for this. What I’m asking in this
thread is what the intent is for the SQL engine. Is this an accident that
nearly everything works with InternalRow? If we were to make a choice here,
should we mandate that rows passed into the SQL engine must be UnsafeRow?

Personally, I think it makes sense to say that everything should accept
InternalRow, but produce UnsafeRow, with the understanding that UnsafeRow
will usually perform better.

rb
​

On Tue, May 8, 2018 at 4:09 PM, Reynold Xin  wrote:

> What the internal operators do are strictly internal. To take one step
> back, is the goal to design an API so the consumers of the API can directly
> produces what Spark expects internally, to cut down perf cost?
>
>
> On Tue, May 8, 2018 at 1:22 PM Ryan Blue 
> wrote:
>
>> While moving the new data source API to InternalRow, I noticed a few odd
>> things:
>>
>>- Spark scans always produce UnsafeRow, but that data is passed
>>around as InternalRow with explicit casts.
>>- Operators expect InternalRow and nearly all codegen works with
>>InternalRow (I’ve tested this with quite a few queries.)
>>- Spark uses unchecked casts
>>
>> 
>>from InternalRow to UnsafeRow in places, assuming that data will be
>>unsafe, even though that isn’t what the type system guarantees.
>>
>> To me, it looks like the idea was to code SQL operators to the abstract
>> InternalRow so we can swap out the implementation, but ended up with a
>> general assumption that rows will always be unsafe. This is the worst of
>> both options: we can’t actually rely on everything working with
>> InternalRow but code must still use it, until it is inconvenient and an
>> unchecked cast gets inserted.
>>
>> The main question I want to answer is this: *what data format should SQL
>> use internally?* What was the intent when building catalyst?
>>
>> The v2 data source API depends on the answer, but I also found that this
>> introduces a significant performance penalty in Parquet (and probably other
>> formats). A quick check on one of our tables showed a 6% performance hit
>> caused by unnecessary copies from InternalRow to UnsafeRow. So if we can
>> guarantee that all operators should support InternalRow, then there is
>> an easy performance win that also simplifies the v2 data source API.
>>
>> rb
>> ​
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

2018-05-08 Thread Reynold Xin
What the internal operators do are strictly internal. To take one step
back, is the goal to design an API so the consumers of the API can directly
produces what Spark expects internally, to cut down perf cost?


On Tue, May 8, 2018 at 1:22 PM Ryan Blue  wrote:

> While moving the new data source API to InternalRow, I noticed a few odd
> things:
>
>- Spark scans always produce UnsafeRow, but that data is passed around
>as InternalRow with explicit casts.
>- Operators expect InternalRow and nearly all codegen works with
>InternalRow (I’ve tested this with quite a few queries.)
>- Spark uses unchecked casts
>
> 
>from InternalRow to UnsafeRow in places, assuming that data will be
>unsafe, even though that isn’t what the type system guarantees.
>
> To me, it looks like the idea was to code SQL operators to the abstract
> InternalRow so we can swap out the implementation, but ended up with a
> general assumption that rows will always be unsafe. This is the worst of
> both options: we can’t actually rely on everything working with
> InternalRow but code must still use it, until it is inconvenient and an
> unchecked cast gets inserted.
>
> The main question I want to answer is this: *what data format should SQL
> use internally?* What was the intent when building catalyst?
>
> The v2 data source API depends on the answer, but I also found that this
> introduces a significant performance penalty in Parquet (and probably other
> formats). A quick check on one of our tables showed a 6% performance hit
> caused by unnecessary copies from InternalRow to UnsafeRow. So if we can
> guarantee that all operators should support InternalRow, then there is an
> easy performance win that also simplifies the v2 data source API.
>
> rb
> ​
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: eager execution and debuggability

2018-05-08 Thread Ryan Blue
I've opened SPARK-24215 to track this.

On Tue, May 8, 2018 at 3:58 PM, Reynold Xin  wrote:

> Yup. Sounds great. This is something simple Spark can do and provide huge
> value to the end users.
>
>
> On Tue, May 8, 2018 at 3:53 PM Ryan Blue  wrote:
>
>> Would be great if it is something more turn-key.
>>
>> We can easily add the __repr__ and _repr_html_ methods and behavior to
>> PySpark classes. We could also add a configuration property to determine
>> whether the dataset evaluation is eager or not. That would make it turn-key
>> for anyone running PySpark in Jupyter.
>>
>> For JVM languages, we could also add a dependency on jvm-repr and do the
>> same thing.
>>
>> rb
>> ​
>>
>> On Tue, May 8, 2018 at 3:47 PM, Reynold Xin  wrote:
>>
>>> s/underestimated/overestimated/
>>>
>>> On Tue, May 8, 2018 at 3:44 PM Reynold Xin  wrote:
>>>
 Marco,

 There is understanding how Spark works, and there is finding bugs early
 in their own program. One can perfectly understand how Spark works and
 still find it valuable to get feedback asap, and that's why we built eager
 analysis in the first place.

 Also I'm afraid you've significantly underestimated the level of
 technical sophistication of users. In many cases they struggle to get
 anything to work, and performance optimization of their programs is
 secondary to getting things working. As John Ousterhout says, "the greatest
 performance improvement of all is when a system goes from not-working to
 working".

 I really like Ryan's approach. Would be great if it is something more
 turn-key.






 On Tue, May 8, 2018 at 2:35 PM Marco Gaido 
 wrote:

> I am not sure how this is useful. For students, it is important to
> understand how Spark works. This can be critical in many decision they 
> have
> to take (whether and what to cache for instance) in order to have
> performant Spark application. Creating a eager execution probably can help
> them having something running more easily, but let them also using Spark
> knowing less about how it works, thus they are likely to write worse
> application and to have more problems in debugging any kind of problem
> which may later (in production) occur (therefore affecting their 
> experience
> with the tool).
>
> Moreover, as Ryan also mentioned, there are tools/ways to force the
> execution, helping in the debugging phase. So they can achieve without a
> big effort the same result, but with a big difference: they are aware of
> what is really happening, which may help them later.
>
> Thanks,
> Marco
>
> 2018-05-08 21:37 GMT+02:00 Ryan Blue :
>
>> At Netflix, we use Jupyter notebooks and consoles for interactive
>> sessions. For anyone interested, this mode of interaction is really easy 
>> to
>> add in Jupyter and PySpark. You would just define a different
>> *repr_html* or *repr* method for Dataset that runs a take(10) or
>> take(100) and formats the result.
>>
>> That way, the output of a cell or console execution always causes the
>> dataframe to run and get displayed for that immediate feedback. But, 
>> there
>> is no change to Spark’s behavior because the action is run by the REPL, 
>> and
>> only when a dataframe is a result of an execution in order to display it.
>> Intermediate results wouldn’t be run, but that gives users a way to avoid
>> too many executions and would still support method chaining in the
>> dataframe API (which would be horrible with an aggressive execution 
>> model).
>>
>> There are ways to do this in JVM languages as well if you are using a
>> Scala or Java interpreter (see jvm-repr
>> ). This is actually what we do
>> in our Spark-based SQL interpreter to display results.
>>
>> rb
>> ​
>>
>> On Tue, May 8, 2018 at 12:05 PM, Koert Kuipers 
>> wrote:
>>
>>> yeah we run into this all the time with new hires. they will send
>>> emails explaining there is an error in the .write operation and they are
>>> debugging the writing to disk, focusing on that piece of code :)
>>>
>>> unrelated, but another frequent cause for confusion is cascading
>>> errors. like the FetchFailedException. they will be debugging the 
>>> reducer
>>> task not realizing the error happened before that, and the
>>> FetchFailedException is not the root cause.
>>>
>>>
>>> On Tue, May 8, 2018 at 2:52 PM, Reynold Xin 
>>> wrote:
>>>
 Similar to the thread yesterday about improving ML/DL integration,
 I'm sending another email on what I've learned 

Re: eager execution and debuggability

2018-05-08 Thread Reynold Xin
Yup. Sounds great. This is something simple Spark can do and provide huge
value to the end users.


On Tue, May 8, 2018 at 3:53 PM Ryan Blue  wrote:

> Would be great if it is something more turn-key.
>
> We can easily add the __repr__ and _repr_html_ methods and behavior to
> PySpark classes. We could also add a configuration property to determine
> whether the dataset evaluation is eager or not. That would make it turn-key
> for anyone running PySpark in Jupyter.
>
> For JVM languages, we could also add a dependency on jvm-repr and do the
> same thing.
>
> rb
> ​
>
> On Tue, May 8, 2018 at 3:47 PM, Reynold Xin  wrote:
>
>> s/underestimated/overestimated/
>>
>> On Tue, May 8, 2018 at 3:44 PM Reynold Xin  wrote:
>>
>>> Marco,
>>>
>>> There is understanding how Spark works, and there is finding bugs early
>>> in their own program. One can perfectly understand how Spark works and
>>> still find it valuable to get feedback asap, and that's why we built eager
>>> analysis in the first place.
>>>
>>> Also I'm afraid you've significantly underestimated the level of
>>> technical sophistication of users. In many cases they struggle to get
>>> anything to work, and performance optimization of their programs is
>>> secondary to getting things working. As John Ousterhout says, "the greatest
>>> performance improvement of all is when a system goes from not-working to
>>> working".
>>>
>>> I really like Ryan's approach. Would be great if it is something more
>>> turn-key.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, May 8, 2018 at 2:35 PM Marco Gaido 
>>> wrote:
>>>
 I am not sure how this is useful. For students, it is important to
 understand how Spark works. This can be critical in many decision they have
 to take (whether and what to cache for instance) in order to have
 performant Spark application. Creating a eager execution probably can help
 them having something running more easily, but let them also using Spark
 knowing less about how it works, thus they are likely to write worse
 application and to have more problems in debugging any kind of problem
 which may later (in production) occur (therefore affecting their experience
 with the tool).

 Moreover, as Ryan also mentioned, there are tools/ways to force the
 execution, helping in the debugging phase. So they can achieve without a
 big effort the same result, but with a big difference: they are aware of
 what is really happening, which may help them later.

 Thanks,
 Marco

 2018-05-08 21:37 GMT+02:00 Ryan Blue :

> At Netflix, we use Jupyter notebooks and consoles for interactive
> sessions. For anyone interested, this mode of interaction is really easy 
> to
> add in Jupyter and PySpark. You would just define a different
> *repr_html* or *repr* method for Dataset that runs a take(10) or
> take(100) and formats the result.
>
> That way, the output of a cell or console execution always causes the
> dataframe to run and get displayed for that immediate feedback. But, there
> is no change to Spark’s behavior because the action is run by the REPL, 
> and
> only when a dataframe is a result of an execution in order to display it.
> Intermediate results wouldn’t be run, but that gives users a way to avoid
> too many executions and would still support method chaining in the
> dataframe API (which would be horrible with an aggressive execution 
> model).
>
> There are ways to do this in JVM languages as well if you are using a
> Scala or Java interpreter (see jvm-repr
> ). This is actually what we do
> in our Spark-based SQL interpreter to display results.
>
> rb
> ​
>
> On Tue, May 8, 2018 at 12:05 PM, Koert Kuipers 
> wrote:
>
>> yeah we run into this all the time with new hires. they will send
>> emails explaining there is an error in the .write operation and they are
>> debugging the writing to disk, focusing on that piece of code :)
>>
>> unrelated, but another frequent cause for confusion is cascading
>> errors. like the FetchFailedException. they will be debugging the reducer
>> task not realizing the error happened before that, and the
>> FetchFailedException is not the root cause.
>>
>>
>> On Tue, May 8, 2018 at 2:52 PM, Reynold Xin 
>> wrote:
>>
>>> Similar to the thread yesterday about improving ML/DL integration,
>>> I'm sending another email on what I've learned recently from Spark 
>>> users. I
>>> recently talked to some educators that have been teaching Spark in their
>>> (top-tier) university classes. They are some of the most important users
>>> for adoption because of the multiplicative effect they have 

Re: eager execution and debuggability

2018-05-08 Thread Ryan Blue
Would be great if it is something more turn-key.

We can easily add the __repr__ and _repr_html_ methods and behavior to
PySpark classes. We could also add a configuration property to determine
whether the dataset evaluation is eager or not. That would make it turn-key
for anyone running PySpark in Jupyter.

For JVM languages, we could also add a dependency on jvm-repr and do the
same thing.

rb
​

On Tue, May 8, 2018 at 3:47 PM, Reynold Xin  wrote:

> s/underestimated/overestimated/
>
> On Tue, May 8, 2018 at 3:44 PM Reynold Xin  wrote:
>
>> Marco,
>>
>> There is understanding how Spark works, and there is finding bugs early
>> in their own program. One can perfectly understand how Spark works and
>> still find it valuable to get feedback asap, and that's why we built eager
>> analysis in the first place.
>>
>> Also I'm afraid you've significantly underestimated the level of
>> technical sophistication of users. In many cases they struggle to get
>> anything to work, and performance optimization of their programs is
>> secondary to getting things working. As John Ousterhout says, "the greatest
>> performance improvement of all is when a system goes from not-working to
>> working".
>>
>> I really like Ryan's approach. Would be great if it is something more
>> turn-key.
>>
>>
>>
>>
>>
>>
>> On Tue, May 8, 2018 at 2:35 PM Marco Gaido 
>> wrote:
>>
>>> I am not sure how this is useful. For students, it is important to
>>> understand how Spark works. This can be critical in many decision they have
>>> to take (whether and what to cache for instance) in order to have
>>> performant Spark application. Creating a eager execution probably can help
>>> them having something running more easily, but let them also using Spark
>>> knowing less about how it works, thus they are likely to write worse
>>> application and to have more problems in debugging any kind of problem
>>> which may later (in production) occur (therefore affecting their experience
>>> with the tool).
>>>
>>> Moreover, as Ryan also mentioned, there are tools/ways to force the
>>> execution, helping in the debugging phase. So they can achieve without a
>>> big effort the same result, but with a big difference: they are aware of
>>> what is really happening, which may help them later.
>>>
>>> Thanks,
>>> Marco
>>>
>>> 2018-05-08 21:37 GMT+02:00 Ryan Blue :
>>>
 At Netflix, we use Jupyter notebooks and consoles for interactive
 sessions. For anyone interested, this mode of interaction is really easy to
 add in Jupyter and PySpark. You would just define a different
 *repr_html* or *repr* method for Dataset that runs a take(10) or
 take(100) and formats the result.

 That way, the output of a cell or console execution always causes the
 dataframe to run and get displayed for that immediate feedback. But, there
 is no change to Spark’s behavior because the action is run by the REPL, and
 only when a dataframe is a result of an execution in order to display it.
 Intermediate results wouldn’t be run, but that gives users a way to avoid
 too many executions and would still support method chaining in the
 dataframe API (which would be horrible with an aggressive execution model).

 There are ways to do this in JVM languages as well if you are using a
 Scala or Java interpreter (see jvm-repr
 ). This is actually what we do in
 our Spark-based SQL interpreter to display results.

 rb
 ​

 On Tue, May 8, 2018 at 12:05 PM, Koert Kuipers 
 wrote:

> yeah we run into this all the time with new hires. they will send
> emails explaining there is an error in the .write operation and they are
> debugging the writing to disk, focusing on that piece of code :)
>
> unrelated, but another frequent cause for confusion is cascading
> errors. like the FetchFailedException. they will be debugging the reducer
> task not realizing the error happened before that, and the
> FetchFailedException is not the root cause.
>
>
> On Tue, May 8, 2018 at 2:52 PM, Reynold Xin 
> wrote:
>
>> Similar to the thread yesterday about improving ML/DL integration,
>> I'm sending another email on what I've learned recently from Spark 
>> users. I
>> recently talked to some educators that have been teaching Spark in their
>> (top-tier) university classes. They are some of the most important users
>> for adoption because of the multiplicative effect they have on the future
>> generation.
>>
>> To my surprise the single biggest ask they want is to enable eager
>> execution mode on all operations for teaching and debuggability:
>>
>> (1) Most of the students are relatively new to programming, and they
>> need multiple iterations to 

Re: eager execution and debuggability

2018-05-08 Thread Reynold Xin
s/underestimated/overestimated/

On Tue, May 8, 2018 at 3:44 PM Reynold Xin  wrote:

> Marco,
>
> There is understanding how Spark works, and there is finding bugs early in
> their own program. One can perfectly understand how Spark works and still
> find it valuable to get feedback asap, and that's why we built eager
> analysis in the first place.
>
> Also I'm afraid you've significantly underestimated the level of technical
> sophistication of users. In many cases they struggle to get anything to
> work, and performance optimization of their programs is secondary to
> getting things working. As John Ousterhout says, "the greatest performance
> improvement of all is when a system goes from not-working to working".
>
> I really like Ryan's approach. Would be great if it is something more
> turn-key.
>
>
>
>
>
>
> On Tue, May 8, 2018 at 2:35 PM Marco Gaido  wrote:
>
>> I am not sure how this is useful. For students, it is important to
>> understand how Spark works. This can be critical in many decision they have
>> to take (whether and what to cache for instance) in order to have
>> performant Spark application. Creating a eager execution probably can help
>> them having something running more easily, but let them also using Spark
>> knowing less about how it works, thus they are likely to write worse
>> application and to have more problems in debugging any kind of problem
>> which may later (in production) occur (therefore affecting their experience
>> with the tool).
>>
>> Moreover, as Ryan also mentioned, there are tools/ways to force the
>> execution, helping in the debugging phase. So they can achieve without a
>> big effort the same result, but with a big difference: they are aware of
>> what is really happening, which may help them later.
>>
>> Thanks,
>> Marco
>>
>> 2018-05-08 21:37 GMT+02:00 Ryan Blue :
>>
>>> At Netflix, we use Jupyter notebooks and consoles for interactive
>>> sessions. For anyone interested, this mode of interaction is really easy to
>>> add in Jupyter and PySpark. You would just define a different
>>> *repr_html* or *repr* method for Dataset that runs a take(10) or
>>> take(100) and formats the result.
>>>
>>> That way, the output of a cell or console execution always causes the
>>> dataframe to run and get displayed for that immediate feedback. But, there
>>> is no change to Spark’s behavior because the action is run by the REPL, and
>>> only when a dataframe is a result of an execution in order to display it.
>>> Intermediate results wouldn’t be run, but that gives users a way to avoid
>>> too many executions and would still support method chaining in the
>>> dataframe API (which would be horrible with an aggressive execution model).
>>>
>>> There are ways to do this in JVM languages as well if you are using a
>>> Scala or Java interpreter (see jvm-repr
>>> ). This is actually what we do in
>>> our Spark-based SQL interpreter to display results.
>>>
>>> rb
>>> ​
>>>
>>> On Tue, May 8, 2018 at 12:05 PM, Koert Kuipers 
>>> wrote:
>>>
 yeah we run into this all the time with new hires. they will send
 emails explaining there is an error in the .write operation and they are
 debugging the writing to disk, focusing on that piece of code :)

 unrelated, but another frequent cause for confusion is cascading
 errors. like the FetchFailedException. they will be debugging the reducer
 task not realizing the error happened before that, and the
 FetchFailedException is not the root cause.


 On Tue, May 8, 2018 at 2:52 PM, Reynold Xin 
 wrote:

> Similar to the thread yesterday about improving ML/DL integration, I'm
> sending another email on what I've learned recently from Spark users. I
> recently talked to some educators that have been teaching Spark in their
> (top-tier) university classes. They are some of the most important users
> for adoption because of the multiplicative effect they have on the future
> generation.
>
> To my surprise the single biggest ask they want is to enable eager
> execution mode on all operations for teaching and debuggability:
>
> (1) Most of the students are relatively new to programming, and they
> need multiple iterations to even get the most basic operation right. In
> these cases, in order to trigger an error, they would need to explicitly
> add actions, which is non-intuitive.
>
> (2) If they don't add explicit actions to every operation and there is
> a mistake, the error pops up somewhere later where an action is triggered.
> This is in a different position from the code that causes the problem, and
> difficult for students to correlate the two.
>
> I suspect in the real world a lot of Spark users also struggle in
> similar ways as these students. While eager 

Re: eager execution and debuggability

2018-05-08 Thread Reynold Xin
Marco,

There is understanding how Spark works, and there is finding bugs early in
their own program. One can perfectly understand how Spark works and still
find it valuable to get feedback asap, and that's why we built eager
analysis in the first place.

Also I'm afraid you've significantly underestimated the level of technical
sophistication of users. In many cases they struggle to get anything to
work, and performance optimization of their programs is secondary to
getting things working. As John Ousterhout says, "the greatest performance
improvement of all is when a system goes from not-working to working".

I really like Ryan's approach. Would be great if it is something more
turn-key.






On Tue, May 8, 2018 at 2:35 PM Marco Gaido  wrote:

> I am not sure how this is useful. For students, it is important to
> understand how Spark works. This can be critical in many decision they have
> to take (whether and what to cache for instance) in order to have
> performant Spark application. Creating a eager execution probably can help
> them having something running more easily, but let them also using Spark
> knowing less about how it works, thus they are likely to write worse
> application and to have more problems in debugging any kind of problem
> which may later (in production) occur (therefore affecting their experience
> with the tool).
>
> Moreover, as Ryan also mentioned, there are tools/ways to force the
> execution, helping in the debugging phase. So they can achieve without a
> big effort the same result, but with a big difference: they are aware of
> what is really happening, which may help them later.
>
> Thanks,
> Marco
>
> 2018-05-08 21:37 GMT+02:00 Ryan Blue :
>
>> At Netflix, we use Jupyter notebooks and consoles for interactive
>> sessions. For anyone interested, this mode of interaction is really easy to
>> add in Jupyter and PySpark. You would just define a different *repr_html*
>> or *repr* method for Dataset that runs a take(10) or take(100) and
>> formats the result.
>>
>> That way, the output of a cell or console execution always causes the
>> dataframe to run and get displayed for that immediate feedback. But, there
>> is no change to Spark’s behavior because the action is run by the REPL, and
>> only when a dataframe is a result of an execution in order to display it.
>> Intermediate results wouldn’t be run, but that gives users a way to avoid
>> too many executions and would still support method chaining in the
>> dataframe API (which would be horrible with an aggressive execution model).
>>
>> There are ways to do this in JVM languages as well if you are using a
>> Scala or Java interpreter (see jvm-repr
>> ). This is actually what we do in
>> our Spark-based SQL interpreter to display results.
>>
>> rb
>> ​
>>
>> On Tue, May 8, 2018 at 12:05 PM, Koert Kuipers  wrote:
>>
>>> yeah we run into this all the time with new hires. they will send emails
>>> explaining there is an error in the .write operation and they are debugging
>>> the writing to disk, focusing on that piece of code :)
>>>
>>> unrelated, but another frequent cause for confusion is cascading errors.
>>> like the FetchFailedException. they will be debugging the reducer task not
>>> realizing the error happened before that, and the FetchFailedException is
>>> not the root cause.
>>>
>>>
>>> On Tue, May 8, 2018 at 2:52 PM, Reynold Xin  wrote:
>>>
 Similar to the thread yesterday about improving ML/DL integration, I'm
 sending another email on what I've learned recently from Spark users. I
 recently talked to some educators that have been teaching Spark in their
 (top-tier) university classes. They are some of the most important users
 for adoption because of the multiplicative effect they have on the future
 generation.

 To my surprise the single biggest ask they want is to enable eager
 execution mode on all operations for teaching and debuggability:

 (1) Most of the students are relatively new to programming, and they
 need multiple iterations to even get the most basic operation right. In
 these cases, in order to trigger an error, they would need to explicitly
 add actions, which is non-intuitive.

 (2) If they don't add explicit actions to every operation and there is
 a mistake, the error pops up somewhere later where an action is triggered.
 This is in a different position from the code that causes the problem, and
 difficult for students to correlate the two.

 I suspect in the real world a lot of Spark users also struggle in
 similar ways as these students. While eager execution is really not
 practical in big data, in learning environments or in development against
 small, sampled datasets it can be pretty helpful.










>>>
>>
>>
>> --
>> Ryan 

Re: eager execution and debuggability

2018-05-08 Thread Marco Gaido
I am not sure how this is useful. For students, it is important to
understand how Spark works. This can be critical in many decision they have
to take (whether and what to cache for instance) in order to have
performant Spark application. Creating a eager execution probably can help
them having something running more easily, but let them also using Spark
knowing less about how it works, thus they are likely to write worse
application and to have more problems in debugging any kind of problem
which may later (in production) occur (therefore affecting their experience
with the tool).

Moreover, as Ryan also mentioned, there are tools/ways to force the
execution, helping in the debugging phase. So they can achieve without a
big effort the same result, but with a big difference: they are aware of
what is really happening, which may help them later.

Thanks,
Marco

2018-05-08 21:37 GMT+02:00 Ryan Blue :

> At Netflix, we use Jupyter notebooks and consoles for interactive
> sessions. For anyone interested, this mode of interaction is really easy to
> add in Jupyter and PySpark. You would just define a different *repr_html*
> or *repr* method for Dataset that runs a take(10) or take(100) and
> formats the result.
>
> That way, the output of a cell or console execution always causes the
> dataframe to run and get displayed for that immediate feedback. But, there
> is no change to Spark’s behavior because the action is run by the REPL, and
> only when a dataframe is a result of an execution in order to display it.
> Intermediate results wouldn’t be run, but that gives users a way to avoid
> too many executions and would still support method chaining in the
> dataframe API (which would be horrible with an aggressive execution model).
>
> There are ways to do this in JVM languages as well if you are using a
> Scala or Java interpreter (see jvm-repr
> ). This is actually what we do in
> our Spark-based SQL interpreter to display results.
>
> rb
> ​
>
> On Tue, May 8, 2018 at 12:05 PM, Koert Kuipers  wrote:
>
>> yeah we run into this all the time with new hires. they will send emails
>> explaining there is an error in the .write operation and they are debugging
>> the writing to disk, focusing on that piece of code :)
>>
>> unrelated, but another frequent cause for confusion is cascading errors.
>> like the FetchFailedException. they will be debugging the reducer task not
>> realizing the error happened before that, and the FetchFailedException is
>> not the root cause.
>>
>>
>> On Tue, May 8, 2018 at 2:52 PM, Reynold Xin  wrote:
>>
>>> Similar to the thread yesterday about improving ML/DL integration, I'm
>>> sending another email on what I've learned recently from Spark users. I
>>> recently talked to some educators that have been teaching Spark in their
>>> (top-tier) university classes. They are some of the most important users
>>> for adoption because of the multiplicative effect they have on the future
>>> generation.
>>>
>>> To my surprise the single biggest ask they want is to enable eager
>>> execution mode on all operations for teaching and debuggability:
>>>
>>> (1) Most of the students are relatively new to programming, and they
>>> need multiple iterations to even get the most basic operation right. In
>>> these cases, in order to trigger an error, they would need to explicitly
>>> add actions, which is non-intuitive.
>>>
>>> (2) If they don't add explicit actions to every operation and there is a
>>> mistake, the error pops up somewhere later where an action is triggered.
>>> This is in a different position from the code that causes the problem, and
>>> difficult for students to correlate the two.
>>>
>>> I suspect in the real world a lot of Spark users also struggle in
>>> similar ways as these students. While eager execution is really not
>>> practical in big data, in learning environments or in development against
>>> small, sampled datasets it can be pretty helpful.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


[DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

2018-05-08 Thread Ryan Blue
While moving the new data source API to InternalRow, I noticed a few odd
things:

   - Spark scans always produce UnsafeRow, but that data is passed around
   as InternalRow with explicit casts.
   - Operators expect InternalRow and nearly all codegen works with
   InternalRow (I’ve tested this with quite a few queries.)
   - Spark uses unchecked casts
   

   from InternalRow to UnsafeRow in places, assuming that data will be
   unsafe, even though that isn’t what the type system guarantees.

To me, it looks like the idea was to code SQL operators to the abstract
InternalRow so we can swap out the implementation, but ended up with a
general assumption that rows will always be unsafe. This is the worst of
both options: we can’t actually rely on everything working with InternalRow
but code must still use it, until it is inconvenient and an unchecked cast
gets inserted.

The main question I want to answer is this: *what data format should SQL
use internally?* What was the intent when building catalyst?

The v2 data source API depends on the answer, but I also found that this
introduces a significant performance penalty in Parquet (and probably other
formats). A quick check on one of our tables showed a 6% performance hit
caused by unnecessary copies from InternalRow to UnsafeRow. So if we can
guarantee that all operators should support InternalRow, then there is an
easy performance win that also simplifies the v2 data source API.

rb
​
-- 
Ryan Blue
Software Engineer
Netflix


Re: eager execution and debuggability

2018-05-08 Thread Ryan Blue
At Netflix, we use Jupyter notebooks and consoles for interactive sessions.
For anyone interested, this mode of interaction is really easy to add in
Jupyter and PySpark. You would just define a different *repr_html* or *repr*
method for Dataset that runs a take(10) or take(100) and formats the result.

That way, the output of a cell or console execution always causes the
dataframe to run and get displayed for that immediate feedback. But, there
is no change to Spark’s behavior because the action is run by the REPL, and
only when a dataframe is a result of an execution in order to display it.
Intermediate results wouldn’t be run, but that gives users a way to avoid
too many executions and would still support method chaining in the
dataframe API (which would be horrible with an aggressive execution model).

There are ways to do this in JVM languages as well if you are using a Scala
or Java interpreter (see jvm-repr ).
This is actually what we do in our Spark-based SQL interpreter to display
results.

rb
​

On Tue, May 8, 2018 at 12:05 PM, Koert Kuipers  wrote:

> yeah we run into this all the time with new hires. they will send emails
> explaining there is an error in the .write operation and they are debugging
> the writing to disk, focusing on that piece of code :)
>
> unrelated, but another frequent cause for confusion is cascading errors.
> like the FetchFailedException. they will be debugging the reducer task not
> realizing the error happened before that, and the FetchFailedException is
> not the root cause.
>
>
> On Tue, May 8, 2018 at 2:52 PM, Reynold Xin  wrote:
>
>> Similar to the thread yesterday about improving ML/DL integration, I'm
>> sending another email on what I've learned recently from Spark users. I
>> recently talked to some educators that have been teaching Spark in their
>> (top-tier) university classes. They are some of the most important users
>> for adoption because of the multiplicative effect they have on the future
>> generation.
>>
>> To my surprise the single biggest ask they want is to enable eager
>> execution mode on all operations for teaching and debuggability:
>>
>> (1) Most of the students are relatively new to programming, and they need
>> multiple iterations to even get the most basic operation right. In these
>> cases, in order to trigger an error, they would need to explicitly add
>> actions, which is non-intuitive.
>>
>> (2) If they don't add explicit actions to every operation and there is a
>> mistake, the error pops up somewhere later where an action is triggered.
>> This is in a different position from the code that causes the problem, and
>> difficult for students to correlate the two.
>>
>> I suspect in the real world a lot of Spark users also struggle in similar
>> ways as these students. While eager execution is really not practical in
>> big data, in learning environments or in development against small, sampled
>> datasets it can be pretty helpful.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: eager execution and debuggability

2018-05-08 Thread Koert Kuipers
yeah we run into this all the time with new hires. they will send emails
explaining there is an error in the .write operation and they are debugging
the writing to disk, focusing on that piece of code :)

unrelated, but another frequent cause for confusion is cascading errors.
like the FetchFailedException. they will be debugging the reducer task not
realizing the error happened before that, and the FetchFailedException is
not the root cause.


On Tue, May 8, 2018 at 2:52 PM, Reynold Xin  wrote:

> Similar to the thread yesterday about improving ML/DL integration, I'm
> sending another email on what I've learned recently from Spark users. I
> recently talked to some educators that have been teaching Spark in their
> (top-tier) university classes. They are some of the most important users
> for adoption because of the multiplicative effect they have on the future
> generation.
>
> To my surprise the single biggest ask they want is to enable eager
> execution mode on all operations for teaching and debuggability:
>
> (1) Most of the students are relatively new to programming, and they need
> multiple iterations to even get the most basic operation right. In these
> cases, in order to trigger an error, they would need to explicitly add
> actions, which is non-intuitive.
>
> (2) If they don't add explicit actions to every operation and there is a
> mistake, the error pops up somewhere later where an action is triggered.
> This is in a different position from the code that causes the problem, and
> difficult for students to correlate the two.
>
> I suspect in the real world a lot of Spark users also struggle in similar
> ways as these students. While eager execution is really not practical in
> big data, in learning environments or in development against small, sampled
> datasets it can be pretty helpful.
>
>
>
>
>
>
>
>
>
>


eager execution and debuggability

2018-05-08 Thread Reynold Xin
Similar to the thread yesterday about improving ML/DL integration, I'm
sending another email on what I've learned recently from Spark users. I
recently talked to some educators that have been teaching Spark in their
(top-tier) university classes. They are some of the most important users
for adoption because of the multiplicative effect they have on the future
generation.

To my surprise the single biggest ask they want is to enable eager
execution mode on all operations for teaching and debuggability:

(1) Most of the students are relatively new to programming, and they need
multiple iterations to even get the most basic operation right. In these
cases, in order to trigger an error, they would need to explicitly add
actions, which is non-intuitive.

(2) If they don't add explicit actions to every operation and there is a
mistake, the error pops up somewhere later where an action is triggered.
This is in a different position from the code that causes the problem, and
difficult for students to correlate the two.

I suspect in the real world a lot of Spark users also struggle in similar
ways as these students. While eager execution is really not practical in
big data, in learning environments or in development against small, sampled
datasets it can be pretty helpful.


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Nan Zhu
.how I skipped the last part

On Tue, May 8, 2018 at 11:16 AM, Reynold Xin  wrote:

> Yes, Nan, totally agree. To be on the same page, that's exactly what I
> wrote wasn't it?
>
> On Tue, May 8, 2018 at 11:14 AM Nan Zhu  wrote:
>
>> besides that, one of the things which is needed by multiple frameworks is
>> to schedule tasks in a single wave
>>
>> i.e.
>>
>> if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark
>> is desired to provide a capability to ensure that either we run 50 tasks at
>> once, or we should quit the complete application/job after some timeout
>> period
>>
>> Best,
>>
>> Nan
>>
>> On Tue, May 8, 2018 at 11:10 AM, Reynold Xin  wrote:
>>
>>> I think that's what Xiangrui was referring to. Instead of retrying a
>>> single task, retry the entire stage, and the entire stage of tasks need to
>>> be scheduled all at once.
>>>
>>>
>>> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
>>> shiva...@eecs.berkeley.edu> wrote:
>>>

>
>>- Fault tolerance and execution model: Spark assumes fine-grained
>>task recovery, i.e. if something fails, only that task is rerun. This
>>doesn’t match the execution model of distributed ML/DL frameworks 
>> that are
>>typically MPI-based, and rerunning a single task would lead to the 
>> entire
>>system hanging. A whole stage needs to be re-run.
>>
>> This is not only useful for integrating with 3rd-party frameworks,
> but also useful for scaling MLlib algorithms. One of my earliest attempts
> in Spark MLlib was to implement All-Reduce primitive (SPARK-1485
> ). But we ended up
> with some compromised solutions. With the new execution model, we can set
> up a hybrid cluster and do all-reduce properly.
>
>
 Is there a particular new execution model you are referring to or do we
 plan to investigate a new execution model ?  For the MPI-like model, we
 also need gang scheduling (i.e. schedule all tasks at once or none of them)
 and I dont think we have support for that in the scheduler right now.

>
>> --
>
> Xiangrui Meng
>
> Software Engineer
>
> Databricks Inc. [image: http://databricks.com]
> 
>


>>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Reynold Xin
Yes, Nan, totally agree. To be on the same page, that's exactly what I
wrote wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu  wrote:

> besides that, one of the things which is needed by multiple frameworks is
> to schedule tasks in a single wave
>
> i.e.
>
> if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark
> is desired to provide a capability to ensure that either we run 50 tasks at
> once, or we should quit the complete application/job after some timeout
> period
>
> Best,
>
> Nan
>
> On Tue, May 8, 2018 at 11:10 AM, Reynold Xin  wrote:
>
>> I think that's what Xiangrui was referring to. Instead of retrying a
>> single task, retry the entire stage, and the entire stage of tasks need to
>> be scheduled all at once.
>>
>>
>> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>>

>- Fault tolerance and execution model: Spark assumes fine-grained
>task recovery, i.e. if something fails, only that task is rerun. This
>doesn’t match the execution model of distributed ML/DL frameworks that 
> are
>typically MPI-based, and rerunning a single task would lead to the 
> entire
>system hanging. A whole stage needs to be re-run.
>
> This is not only useful for integrating with 3rd-party frameworks, but
 also useful for scaling MLlib algorithms. One of my earliest attempts in
 Spark MLlib was to implement All-Reduce primitive (SPARK-1485
 ). But we ended up
 with some compromised solutions. With the new execution model, we can set
 up a hybrid cluster and do all-reduce properly.


>>> Is there a particular new execution model you are referring to or do we
>>> plan to investigate a new execution model ?  For the MPI-like model, we
>>> also need gang scheduling (i.e. schedule all tasks at once or none of them)
>>> and I dont think we have support for that in the scheduler right now.
>>>

> --

 Xiangrui Meng

 Software Engineer

 Databricks Inc. [image: http://databricks.com] 

>>>
>>>
>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Nan Zhu
besides that, one of the things which is needed by multiple frameworks is
to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark
is desired to provide a capability to ensure that either we run 50 tasks at
once, or we should quit the complete application/job after some timeout
period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin  wrote:

> I think that's what Xiangrui was referring to. Instead of retrying a
> single task, retry the entire stage, and the entire stage of tasks need to
> be scheduled all at once.
>
>
> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>>
>>>
- Fault tolerance and execution model: Spark assumes fine-grained
task recovery, i.e. if something fails, only that task is rerun. This
doesn’t match the execution model of distributed ML/DL frameworks that 
 are
typically MPI-based, and rerunning a single task would lead to the 
 entire
system hanging. A whole stage needs to be re-run.

 This is not only useful for integrating with 3rd-party frameworks, but
>>> also useful for scaling MLlib algorithms. One of my earliest attempts in
>>> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>>> ). But we ended up
>>> with some compromised solutions. With the new execution model, we can set
>>> up a hybrid cluster and do all-reduce properly.
>>>
>>>
>> Is there a particular new execution model you are referring to or do we
>> plan to investigate a new execution model ?  For the MPI-like model, we
>> also need gang scheduling (i.e. schedule all tasks at once or none of them)
>> and I dont think we have support for that in the scheduler right now.
>>
>>>
 --
>>>
>>> Xiangrui Meng
>>>
>>> Software Engineer
>>>
>>> Databricks Inc. [image: http://databricks.com] 
>>>
>>
>>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Reynold Xin
I think that's what Xiangrui was referring to. Instead of retrying a single
task, retry the entire stage, and the entire stage of tasks need to be
scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

>
>>
>>>- Fault tolerance and execution model: Spark assumes fine-grained
>>>task recovery, i.e. if something fails, only that task is rerun. This
>>>doesn’t match the execution model of distributed ML/DL frameworks that 
>>> are
>>>typically MPI-based, and rerunning a single task would lead to the entire
>>>system hanging. A whole stage needs to be re-run.
>>>
>>> This is not only useful for integrating with 3rd-party frameworks, but
>> also useful for scaling MLlib algorithms. One of my earliest attempts in
>> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>> ). But we ended up
>> with some compromised solutions. With the new execution model, we can set
>> up a hybrid cluster and do all-reduce properly.
>>
>>
> Is there a particular new execution model you are referring to or do we
> plan to investigate a new execution model ?  For the MPI-like model, we
> also need gang scheduling (i.e. schedule all tasks at once or none of them)
> and I dont think we have support for that in the scheduler right now.
>
>>
>>> --
>>
>> Xiangrui Meng
>>
>> Software Engineer
>>
>> Databricks Inc. [image: http://databricks.com] 
>>
>
>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Naveen Swamy
I am committer on the MXNet project and very interested in working on
Integrating with Spark.
I am wondering how would training proceed in case of
1)  training is done on one host with multiple GPUs -- I don't know if
Spark's capabilities can leveraged here
2) distributed training with data parallelism -- how can we leverage
Spark's map reduce model to fit distributed training. model of execution
here is more of iterative in nature.

Please let me know.

Thanks, Naveen



On Tue, May 8, 2018 at 8:53 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

>
>>
>>>- Fault tolerance and execution model: Spark assumes fine-grained
>>>task recovery, i.e. if something fails, only that task is rerun. This
>>>doesn’t match the execution model of distributed ML/DL frameworks that 
>>> are
>>>typically MPI-based, and rerunning a single task would lead to the entire
>>>system hanging. A whole stage needs to be re-run.
>>>
>>> This is not only useful for integrating with 3rd-party frameworks, but
>> also useful for scaling MLlib algorithms. One of my earliest attempts in
>> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>> ). But we ended up
>> with some compromised solutions. With the new execution model, we can set
>> up a hybrid cluster and do all-reduce properly.
>>
>>
> Is there a particular new execution model you are referring to or do we
> plan to investigate a new execution model ?  For the MPI-like model, we
> also need gang scheduling (i.e. schedule all tasks at once or none of them)
> and I dont think we have support for that in the scheduler right now.
>
>>
>>> --
>>
>> Xiangrui Meng
>>
>> Software Engineer
>>
>> Databricks Inc. [image: http://databricks.com] 
>>
>
>


Re: Identifying specific persisted DataFrames via getPersistentRDDs()

2018-05-08 Thread Mark Hamstra
If I am understanding you correctly, you're just saying that the problem is
that you know what you want to keep, not what you want to throw away, and
that there is no unpersist DataFrames call based on that what-to-keep
information.

On Tue, May 8, 2018 at 6:00 AM, Nicholas Chammas  wrote:

> I certainly can, but the problem I’m facing is that of how best to track
> all the DataFrames I no longer want to persist.
>
> I create and persist various DataFrames throughout my pipeline. Spark is
> already tracking all this for me, and exposing some of that tracking
> information via getPersistentRDDs(). So when I arrive at a point in my
> program where I know, “I only need this DataFrame going forward”, I want to
> be able to tell Spark “Please unpersist everything except this one
> DataFrame”. If I cannot leverage the information about persisted DataFrames
> that Spark is already tracking, then the alternative is for me to carefully
> track and unpersist DataFrames when I no longer need them.
>
> I suppose the problem is similar at a high level to garbage collection.
> Tracking and freeing DataFrames manually is analogous to malloc and free;
> and full automation would be Spark automatically unpersisting DataFrames
> when they were no longer referenced or needed. I’m looking for an
> in-between solution that lets me leverage some of the persistence tracking
> in Spark so I don’t have to do it all myself.
>
> Does this make more sense now, from a use case perspective as well as from
> a desired API perspective?
> ​
>
> On Thu, May 3, 2018 at 10:26 PM Reynold Xin  wrote:
>
>> Why do you need the underlying RDDs? Can't you just unpersist the
>> dataframes that you don't need?
>>
>>
>> On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> This seems to be an underexposed part of the API. My use case is this: I
>>> want to unpersist all DataFrames except a specific few. I want to do this
>>> because I know at a specific point in my pipeline that I have a handful of
>>> DataFrames that I need, and everything else is no longer needed.
>>>
>>> The problem is that there doesn’t appear to be a way to identify
>>> specific DataFrames (or rather, their underlying RDDs) via
>>> getPersistentRDDs(), which is the only way I’m aware of to ask Spark
>>> for all currently persisted RDDs:
>>>
>>> >>> a = spark.range(10).persist()>>> a.rdd.id()8>>> 
>>> >>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
>>> [(3, JavaObject id=o36)]
>>>
>>> As you can see, the id of the persisted RDD, 8, doesn’t match the id
>>> returned by getPersistentRDDs(), 3. So I can’t go through the RDDs
>>> returned by getPersistentRDDs() and know which ones I want to keep.
>>>
>>> id() itself appears to be an undocumented method of the RDD API, and in
>>> PySpark getPersistentRDDs() is buried behind the Java sub-objects
>>> , so I know I’m
>>> reaching here. But is there a way to do what I want in PySpark without
>>> manually tracking everything I’ve persisted myself?
>>>
>>> And more broadly speaking, do we want to add additional APIs, or
>>> formalize currently undocumented APIs like id(), to make this use case
>>> possible?
>>>
>>> Nick
>>> ​
>>>
>>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Shivaram Venkataraman
>
>
>
>>- Fault tolerance and execution model: Spark assumes fine-grained
>>task recovery, i.e. if something fails, only that task is rerun. This
>>doesn’t match the execution model of distributed ML/DL frameworks that are
>>typically MPI-based, and rerunning a single task would lead to the entire
>>system hanging. A whole stage needs to be re-run.
>>
>> This is not only useful for integrating with 3rd-party frameworks, but
> also useful for scaling MLlib algorithms. One of my earliest attempts in
> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
> ). But we ended up with
> some compromised solutions. With the new execution model, we can set up a
> hybrid cluster and do all-reduce properly.
>
>
Is there a particular new execution model you are referring to or do we
plan to investigate a new execution model ?  For the MPI-like model, we
also need gang scheduling (i.e. schedule all tasks at once or none of them)
and I dont think we have support for that in the scheduler right now.

>
>> --
>
> Xiangrui Meng
>
> Software Engineer
>
> Databricks Inc. [image: http://databricks.com] 
>


Re: Documenting the various DataFrame/SQL join types

2018-05-08 Thread Reynold Xin
Would be great to document. Probably best with examples.

On Tue, May 8, 2018 at 6:13 AM Nicholas Chammas 
wrote:

> The documentation for DataFrame.join()
> 
> lists all the join types we support:
>
>- inner
>- cross
>- outer
>- full
>- full_outer
>- left
>- left_outer
>- right
>- right_outer
>- left_semi
>- left_anti
>
> Some of these join types are also listed on the SQL Programming Guide
> 
> .
>
> Is it obvious to everyone what all these different join types are? For
> example, I had never heard of a LEFT ANTI join until stumbling on it in the
> PySpark docs. It’s quite handy! But I had to experiment with it a bit just
> to understand what it does.
>
> I think it would be a good service to our users if we either documented
> these join types ourselves clearly, or provided a link to an external
> resource that documented them sufficiently. I’m happy to file a JIRA about
> this and do the work itself. It would be great if the documentation could
> be expressed as a series of simple doc tests, but brief prose describing
> how each join works would still be valuable.
>
> Does this seem worthwhile to folks here? And does anyone want to offer
> guidance on how best to provide this kind of documentation so that it’s
> easy to find by users, regardless of the language they’re using?
>
> Nick
> ​
>


Documenting the various DataFrame/SQL join types

2018-05-08 Thread Nicholas Chammas
The documentation for DataFrame.join()

lists all the join types we support:

   - inner
   - cross
   - outer
   - full
   - full_outer
   - left
   - left_outer
   - right
   - right_outer
   - left_semi
   - left_anti

Some of these join types are also listed on the SQL Programming Guide

.

Is it obvious to everyone what all these different join types are? For
example, I had never heard of a LEFT ANTI join until stumbling on it in the
PySpark docs. It’s quite handy! But I had to experiment with it a bit just
to understand what it does.

I think it would be a good service to our users if we either documented
these join types ourselves clearly, or provided a link to an external
resource that documented them sufficiently. I’m happy to file a JIRA about
this and do the work itself. It would be great if the documentation could
be expressed as a series of simple doc tests, but brief prose describing
how each join works would still be valuable.

Does this seem worthwhile to folks here? And does anyone want to offer
guidance on how best to provide this kind of documentation so that it’s
easy to find by users, regardless of the language they’re using?

Nick
​


Re: Identifying specific persisted DataFrames via getPersistentRDDs()

2018-05-08 Thread Nicholas Chammas
I certainly can, but the problem I’m facing is that of how best to track
all the DataFrames I no longer want to persist.

I create and persist various DataFrames throughout my pipeline. Spark is
already tracking all this for me, and exposing some of that tracking
information via getPersistentRDDs(). So when I arrive at a point in my
program where I know, “I only need this DataFrame going forward”, I want to
be able to tell Spark “Please unpersist everything except this one
DataFrame”. If I cannot leverage the information about persisted DataFrames
that Spark is already tracking, then the alternative is for me to carefully
track and unpersist DataFrames when I no longer need them.

I suppose the problem is similar at a high level to garbage collection.
Tracking and freeing DataFrames manually is analogous to malloc and free;
and full automation would be Spark automatically unpersisting DataFrames
when they were no longer referenced or needed. I’m looking for an
in-between solution that lets me leverage some of the persistence tracking
in Spark so I don’t have to do it all myself.

Does this make more sense now, from a use case perspective as well as from
a desired API perspective?
​

On Thu, May 3, 2018 at 10:26 PM Reynold Xin  wrote:

> Why do you need the underlying RDDs? Can't you just unpersist the
> dataframes that you don't need?
>
>
> On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> This seems to be an underexposed part of the API. My use case is this: I
>> want to unpersist all DataFrames except a specific few. I want to do this
>> because I know at a specific point in my pipeline that I have a handful of
>> DataFrames that I need, and everything else is no longer needed.
>>
>> The problem is that there doesn’t appear to be a way to identify specific
>> DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(),
>> which is the only way I’m aware of to ask Spark for all currently persisted
>> RDDs:
>>
>> >>> a = spark.range(10).persist()>>> a.rdd.id()8>>> 
>> >>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
>> [(3, JavaObject id=o36)]
>>
>> As you can see, the id of the persisted RDD, 8, doesn’t match the id
>> returned by getPersistentRDDs(), 3. So I can’t go through the RDDs
>> returned by getPersistentRDDs() and know which ones I want to keep.
>>
>> id() itself appears to be an undocumented method of the RDD API, and in
>> PySpark getPersistentRDDs() is buried behind the Java sub-objects
>> , so I know I’m
>> reaching here. But is there a way to do what I want in PySpark without
>> manually tracking everything I’ve persisted myself?
>>
>> And more broadly speaking, do we want to add additional APIs, or
>> formalize currently undocumented APIs like id(), to make this use case
>> possible?
>>
>> Nick
>> ​
>>
>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Jörn Franke
Hi,

You misunderstood me. I exactly wanted to say that Spark should be aware of 
them. So I agree with you. The point is to have also the yarn GPU/fpga 
scheduling as an option aside a potential spark GPU/fpga scheduler.

For the other proposal - yes the interfaces are slow, but one has to think in 
which part they need to be improved for optimal performance ml framework, Spark 
or in both. My gut feeling is in both. 

Best regards

Best regards

> On 8. May 2018, at 07:11, Reynold Xin  wrote:
> 
> I don't think it's sufficient to have them in YARN (or any other services) 
> without Spark aware of them. If Spark is not aware of them, then there is no 
> way to really efficiently utilize these accelerators when you run anything 
> that require non-accelerators (which is almost 100% of the cases in real 
> world workloads).
> 
> For the other two, the point is not to implement all the ML/DL algorithms in 
> Spark, but make Spark integrate well with ML/DL frameworks. Otherwise you 
> will have the problems I described (super low performance when exchanging 
> data between Spark and ML/DL frameworks, and hanging issues with MPI-based 
> programs).
> 
> 
>> On Mon, May 7, 2018 at 10:05 PM Jörn Franke  wrote:
>> Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA 
>> scheduling, so it might be worth to have the last point generic that not 
>> only the Spark scheduler, but all supported schedulers can use GPU.
>> 
>> For the other 2 points I just wonder if it makes sense to address this in 
>> the ml frameworks themselves or in Spark.
>> 
>>> On 8. May 2018, at 06:59, Xiangrui Meng  wrote:
>>> 
>>> Thanks Reynold for summarizing the offline discussion! I added a few 
>>> comments inline. -Xiangrui
>>> 
 On Mon, May 7, 2018 at 5:37 PM Reynold Xin  wrote:
 Hi all,
 
 Xiangrui and I were discussing with a heavy Apache Spark user last week on 
 their experiences integrating machine learning (and deep learning) 
 frameworks with Spark and some of their pain points. Couple things were 
 obvious and I wanted to share our learnings with the list.
 
 (1) Most organizations already use Spark for data plumbing and want to be 
 able to run their ML part of the stack on Spark as well (not necessarily 
 re-implementing all the algorithms but by integrating various frameworks 
 like tensorflow, mxnet with Spark).
 
 (2) The integration is however painful, from the systems perspective:
 
 Performance: data exchange between Spark and other frameworks are slow, 
 because UDFs across process boundaries (with native code) are slow. This 
 works much better now with Pandas UDFs (given a lot of the ML/DL 
 frameworks are in Python). However, there might be some low hanging fruit 
 gaps here.
>>> The Arrow support behind Pands UDFs can be reused to exchange data with 
>>> other frameworks. And one possibly performance improvement is to support 
>>> pipelining when supplying data to other frameworks. For example, while 
>>> Spark is pumping data from external sources into TensorFlow, TensorFlow 
>>> starts the computation on GPUs. This would significant improve speed and 
>>> resource utilization.
 Fault tolerance and execution model: Spark assumes fine-grained task 
 recovery, i.e. if something fails, only that task is rerun. This doesn’t 
 match the execution model of distributed ML/DL frameworks that are 
 typically MPI-based, and rerunning a single task would lead to the entire 
 system hanging. A whole stage needs to be re-run.
>>> This is not only useful for integrating with 3rd-party frameworks, but also 
>>> useful for scaling MLlib algorithms. One of my earliest attempts in Spark 
>>> MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up 
>>> with some compromised solutions. With the new execution model, we can set 
>>> up a hybrid cluster and do all-reduce properly.
>>>  
 Accelerator-aware scheduling: The DL frameworks leverage GPUs and 
 sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t 
 aware of those resources, leading to either over-utilizing the 
 accelerators or under-utilizing the CPUs.
 
 The good thing is that none of these seem very difficult to address (and 
 we have already made progress on one of them). Xiangrui has graciously 
 accepted the challenge to come up with solutions and SPIP to these.
 
>>> 
>>> I will do more home work, exploring existing JIRAs or creating new JIRAs 
>>> for the proposal. We'd like to hear your feedback and past efforts along 
>>> those directions if they were not fully captured by our JIRA.
>>>  
 Xiangrui - please also chime in if I didn’t capture everything. 
 
 
>>> -- 
>>> Xiangrui Meng
>>> Software Engineer
>>> Databricks Inc.