Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-09-04 Thread Russell Spitzer
They are based on a physical column, the column is real. The function just
only exists in the datasource.

For example

Select ttl(a), ttl(b) FROM table ks.tab

On Tue, Sep 4, 2018 at 11:16 PM Reynold Xin  wrote:

> Russell your special columns wouldn’t actually work with option 1 because
> Spark would have to fail them in analysis without an actual physical
> column.
>
> On Tue, Sep 4, 2018 at 9:12 PM Russell Spitzer 
> wrote:
>
>> I'm a big fan of 1 as well. I had to implement something similar using
>> custom expressions and it was a bit more work than it should be. In
>> particular our use case is that columns have certain metadata (ttl,
>> writetime) which exist not as separate columns but as special values which
>> can be surfaced.
>>
>> I still don't have a good solution for the same thing at write-time
>> though since the problem is a bit asymmetric for us. While you can read a
>> metadata from any particular cell, on write you specify it for the whole
>> row.
>>
>> On Tue, Sep 4, 2018 at 11:04 PM Ryan Blue 
>> wrote:
>>
>>> Thanks for posting the summary. I'm strongly in favor of option 1.
>>>
>>> I think that API footprint is fairly small, but worth it. Not only does
>>> it make sources easier to implement by handling parsing, it also makes
>>> sources more reliable because Spark handles validation the same way across
>>> sources.
>>>
>>> A good example is making sure that the referenced columns exist in the
>>> table, which should be done using the case sensitivity of the analyzer.
>>> Spark would pass normalized column names that match the case of the
>>> declared columns to ensure that there isn't a problem if Spark is case
>>> insensitive but the source doesn't implement it. And the source wouldn't
>>> have to know about Spark's case sensitivity settings at all.
>>>
>>> On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin  wrote:
>>>
 Ryan, Michael and I discussed this offline today. Some notes here:

 His use case is to support partitioning data by derived columns, rather
 than physical columns, because he didn't want his users to keep adding the
 "date" column when in reality they are purely derived from some timestamp
 column. We reached consensus on this is a great use case and something we
 should support.

 We are still debating how to do this at API level. Two options:

 *Option 1.* Create a smaller surfaced, parallel Expression library,
 and use that for specifying partition columns. The bare minimum class
 hierarchy would look like:

 trait Expression

 class NamedFunction(name: String, args: Seq[Expression]) extends
 Expression

 class Literal(value: Any) extends Expression

 class ColumnReference(name: String) extends Expression

 These classes don't define how the expressions are evaluated, and it'd
 be up to the data sources to interpret them. As an example, for a table
 partitioned by date(ts), Spark would pass the following to the underlying
 ds:

 NamedFunction("date", ColumnReference("timestamp") :: Nil)


 *Option 2.* Spark passes strings over to the data sources. For the
 above example, Spark simply passes "date(ts)" as a string over.


 The pros/cons of 1 vs 2 are basically the inverse of each other. Option
 1 creates more rigid structure, with extra complexity in API design. Option
 2 is less structured but more flexible. Option 1 gives Spark the
 opportunity to enforce column references are valid (but not the actual
 function names), whereas option 2 would be up to the data sources to
 validate.



 On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue  wrote:

> I think I found a good solution to the problem of using Expression in
> the TableCatalog API and in the DeleteSupport API.
>
> For DeleteSupport, there is already a stable and public subset of
> Expression named Filter that can be used to pass filters. The reason why
> DeleteSupport would use Expression is to support more complex expressions
> like to_date(ts) = '2018-08-15' that are translated to ts >=
> 15343164 AND ts < 15344028. But, this can be done in
> Spark instead of the data sources so I think DeleteSupport should use
> Filter instead. I updated the DeleteSupport PR #21308
>  with these changes.
>
> Also, I agree that the DataSourceV2 API should also not expose
> Expression, so I opened SPARK-25127 to track removing
> SupportsPushDownCatalystFilter
> .
>
> For TableCatalog, I took a similar approach instead of introducing a
> parallel Expression API. Instead, I created a PartitionTransform API (like
> Filter) that communicates the transformation function, function parameters
> like num buckets, and column references. I updated the TableCatalog
> PR

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-09-04 Thread Reynold Xin
Russell your special columns wouldn’t actually work with option 1 because
Spark would have to fail them in analysis without an actual physical
column.

On Tue, Sep 4, 2018 at 9:12 PM Russell Spitzer 
wrote:

> I'm a big fan of 1 as well. I had to implement something similar using
> custom expressions and it was a bit more work than it should be. In
> particular our use case is that columns have certain metadata (ttl,
> writetime) which exist not as separate columns but as special values which
> can be surfaced.
>
> I still don't have a good solution for the same thing at write-time though
> since the problem is a bit asymmetric for us. While you can read a metadata
> from any particular cell, on write you specify it for the whole row.
>
> On Tue, Sep 4, 2018 at 11:04 PM Ryan Blue 
> wrote:
>
>> Thanks for posting the summary. I'm strongly in favor of option 1.
>>
>> I think that API footprint is fairly small, but worth it. Not only does
>> it make sources easier to implement by handling parsing, it also makes
>> sources more reliable because Spark handles validation the same way across
>> sources.
>>
>> A good example is making sure that the referenced columns exist in the
>> table, which should be done using the case sensitivity of the analyzer.
>> Spark would pass normalized column names that match the case of the
>> declared columns to ensure that there isn't a problem if Spark is case
>> insensitive but the source doesn't implement it. And the source wouldn't
>> have to know about Spark's case sensitivity settings at all.
>>
>> On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin  wrote:
>>
>>> Ryan, Michael and I discussed this offline today. Some notes here:
>>>
>>> His use case is to support partitioning data by derived columns, rather
>>> than physical columns, because he didn't want his users to keep adding the
>>> "date" column when in reality they are purely derived from some timestamp
>>> column. We reached consensus on this is a great use case and something we
>>> should support.
>>>
>>> We are still debating how to do this at API level. Two options:
>>>
>>> *Option 1.* Create a smaller surfaced, parallel Expression library, and
>>> use that for specifying partition columns. The bare minimum class hierarchy
>>> would look like:
>>>
>>> trait Expression
>>>
>>> class NamedFunction(name: String, args: Seq[Expression]) extends
>>> Expression
>>>
>>> class Literal(value: Any) extends Expression
>>>
>>> class ColumnReference(name: String) extends Expression
>>>
>>> These classes don't define how the expressions are evaluated, and it'd
>>> be up to the data sources to interpret them. As an example, for a table
>>> partitioned by date(ts), Spark would pass the following to the underlying
>>> ds:
>>>
>>> NamedFunction("date", ColumnReference("timestamp") :: Nil)
>>>
>>>
>>> *Option 2.* Spark passes strings over to the data sources. For the
>>> above example, Spark simply passes "date(ts)" as a string over.
>>>
>>>
>>> The pros/cons of 1 vs 2 are basically the inverse of each other. Option
>>> 1 creates more rigid structure, with extra complexity in API design. Option
>>> 2 is less structured but more flexible. Option 1 gives Spark the
>>> opportunity to enforce column references are valid (but not the actual
>>> function names), whereas option 2 would be up to the data sources to
>>> validate.
>>>
>>>
>>>
>>> On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue  wrote:
>>>
 I think I found a good solution to the problem of using Expression in
 the TableCatalog API and in the DeleteSupport API.

 For DeleteSupport, there is already a stable and public subset of
 Expression named Filter that can be used to pass filters. The reason why
 DeleteSupport would use Expression is to support more complex expressions
 like to_date(ts) = '2018-08-15' that are translated to ts >=
 15343164 AND ts < 15344028. But, this can be done in
 Spark instead of the data sources so I think DeleteSupport should use
 Filter instead. I updated the DeleteSupport PR #21308
  with these changes.

 Also, I agree that the DataSourceV2 API should also not expose
 Expression, so I opened SPARK-25127 to track removing
 SupportsPushDownCatalystFilter
 .

 For TableCatalog, I took a similar approach instead of introducing a
 parallel Expression API. Instead, I created a PartitionTransform API (like
 Filter) that communicates the transformation function, function parameters
 like num buckets, and column references. I updated the TableCatalog PR
 #21306  to use
 PartitionTransform instead of Expression and I updated the text of the SPIP
 doc
 
 .

 I also raised a concern about needing to wai

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-09-04 Thread Russell Spitzer
I'm a big fan of 1 as well. I had to implement something similar using
custom expressions and it was a bit more work than it should be. In
particular our use case is that columns have certain metadata (ttl,
writetime) which exist not as separate columns but as special values which
can be surfaced.

I still don't have a good solution for the same thing at write-time though
since the problem is a bit asymmetric for us. While you can read a metadata
from any particular cell, on write you specify it for the whole row.

On Tue, Sep 4, 2018 at 11:04 PM Ryan Blue  wrote:

> Thanks for posting the summary. I'm strongly in favor of option 1.
>
> I think that API footprint is fairly small, but worth it. Not only does it
> make sources easier to implement by handling parsing, it also makes sources
> more reliable because Spark handles validation the same way across sources.
>
> A good example is making sure that the referenced columns exist in the
> table, which should be done using the case sensitivity of the analyzer.
> Spark would pass normalized column names that match the case of the
> declared columns to ensure that there isn't a problem if Spark is case
> insensitive but the source doesn't implement it. And the source wouldn't
> have to know about Spark's case sensitivity settings at all.
>
> On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin  wrote:
>
>> Ryan, Michael and I discussed this offline today. Some notes here:
>>
>> His use case is to support partitioning data by derived columns, rather
>> than physical columns, because he didn't want his users to keep adding the
>> "date" column when in reality they are purely derived from some timestamp
>> column. We reached consensus on this is a great use case and something we
>> should support.
>>
>> We are still debating how to do this at API level. Two options:
>>
>> *Option 1.* Create a smaller surfaced, parallel Expression library, and
>> use that for specifying partition columns. The bare minimum class hierarchy
>> would look like:
>>
>> trait Expression
>>
>> class NamedFunction(name: String, args: Seq[Expression]) extends
>> Expression
>>
>> class Literal(value: Any) extends Expression
>>
>> class ColumnReference(name: String) extends Expression
>>
>> These classes don't define how the expressions are evaluated, and it'd be
>> up to the data sources to interpret them. As an example, for a table
>> partitioned by date(ts), Spark would pass the following to the underlying
>> ds:
>>
>> NamedFunction("date", ColumnReference("timestamp") :: Nil)
>>
>>
>> *Option 2.* Spark passes strings over to the data sources. For the above
>> example, Spark simply passes "date(ts)" as a string over.
>>
>>
>> The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1
>> creates more rigid structure, with extra complexity in API design. Option 2
>> is less structured but more flexible. Option 1 gives Spark the opportunity
>> to enforce column references are valid (but not the actual function names),
>> whereas option 2 would be up to the data sources to validate.
>>
>>
>>
>> On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue  wrote:
>>
>>> I think I found a good solution to the problem of using Expression in
>>> the TableCatalog API and in the DeleteSupport API.
>>>
>>> For DeleteSupport, there is already a stable and public subset of
>>> Expression named Filter that can be used to pass filters. The reason why
>>> DeleteSupport would use Expression is to support more complex expressions
>>> like to_date(ts) = '2018-08-15' that are translated to ts >=
>>> 15343164 AND ts < 15344028. But, this can be done in
>>> Spark instead of the data sources so I think DeleteSupport should use
>>> Filter instead. I updated the DeleteSupport PR #21308
>>>  with these changes.
>>>
>>> Also, I agree that the DataSourceV2 API should also not expose
>>> Expression, so I opened SPARK-25127 to track removing
>>> SupportsPushDownCatalystFilter
>>> .
>>>
>>> For TableCatalog, I took a similar approach instead of introducing a
>>> parallel Expression API. Instead, I created a PartitionTransform API (like
>>> Filter) that communicates the transformation function, function parameters
>>> like num buckets, and column references. I updated the TableCatalog PR
>>> #21306  to use
>>> PartitionTransform instead of Expression and I updated the text of the SPIP
>>> doc
>>> 
>>> .
>>>
>>> I also raised a concern about needing to wait for Spark to add support
>>> for new expressions (now partition transforms). To get around this, I added
>>> an apply transform that passes the name of a function and an input
>>> column. That way, users can still pass transforms that Spark doesn’t know
>>> about by name to data sources: apply("source_function", "colName")

Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-09-04 Thread Ryan Blue
Thanks for posting the summary. I'm strongly in favor of option 1.

I think that API footprint is fairly small, but worth it. Not only does it
make sources easier to implement by handling parsing, it also makes sources
more reliable because Spark handles validation the same way across sources.

A good example is making sure that the referenced columns exist in the
table, which should be done using the case sensitivity of the analyzer.
Spark would pass normalized column names that match the case of the
declared columns to ensure that there isn't a problem if Spark is case
insensitive but the source doesn't implement it. And the source wouldn't
have to know about Spark's case sensitivity settings at all.

On Tue, Sep 4, 2018 at 7:46 PM Reynold Xin  wrote:

> Ryan, Michael and I discussed this offline today. Some notes here:
>
> His use case is to support partitioning data by derived columns, rather
> than physical columns, because he didn't want his users to keep adding the
> "date" column when in reality they are purely derived from some timestamp
> column. We reached consensus on this is a great use case and something we
> should support.
>
> We are still debating how to do this at API level. Two options:
>
> *Option 1.* Create a smaller surfaced, parallel Expression library, and
> use that for specifying partition columns. The bare minimum class hierarchy
> would look like:
>
> trait Expression
>
> class NamedFunction(name: String, args: Seq[Expression]) extends Expression
>
> class Literal(value: Any) extends Expression
>
> class ColumnReference(name: String) extends Expression
>
> These classes don't define how the expressions are evaluated, and it'd be
> up to the data sources to interpret them. As an example, for a table
> partitioned by date(ts), Spark would pass the following to the underlying
> ds:
>
> NamedFunction("date", ColumnReference("timestamp") :: Nil)
>
>
> *Option 2.* Spark passes strings over to the data sources. For the above
> example, Spark simply passes "date(ts)" as a string over.
>
>
> The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1
> creates more rigid structure, with extra complexity in API design. Option 2
> is less structured but more flexible. Option 1 gives Spark the opportunity
> to enforce column references are valid (but not the actual function names),
> whereas option 2 would be up to the data sources to validate.
>
>
>
> On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue  wrote:
>
>> I think I found a good solution to the problem of using Expression in the
>> TableCatalog API and in the DeleteSupport API.
>>
>> For DeleteSupport, there is already a stable and public subset of
>> Expression named Filter that can be used to pass filters. The reason why
>> DeleteSupport would use Expression is to support more complex expressions
>> like to_date(ts) = '2018-08-15' that are translated to ts >=
>> 15343164 AND ts < 15344028. But, this can be done in
>> Spark instead of the data sources so I think DeleteSupport should use
>> Filter instead. I updated the DeleteSupport PR #21308
>>  with these changes.
>>
>> Also, I agree that the DataSourceV2 API should also not expose
>> Expression, so I opened SPARK-25127 to track removing
>> SupportsPushDownCatalystFilter
>> .
>>
>> For TableCatalog, I took a similar approach instead of introducing a
>> parallel Expression API. Instead, I created a PartitionTransform API (like
>> Filter) that communicates the transformation function, function parameters
>> like num buckets, and column references. I updated the TableCatalog PR
>> #21306  to use
>> PartitionTransform instead of Expression and I updated the text of the SPIP
>> doc
>> 
>> .
>>
>> I also raised a concern about needing to wait for Spark to add support
>> for new expressions (now partition transforms). To get around this, I added
>> an apply transform that passes the name of a function and an input
>> column. That way, users can still pass transforms that Spark doesn’t know
>> about by name to data sources: apply("source_function", "colName").
>>
>> Please have a look at the updated pull requests and SPIP doc and comment!
>>
>> rb
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Select top (100) percent equivalent in spark

2018-09-04 Thread Wenchen Fan
+ Liang-Chi and Herman,

I think this is a common requirement to get top N records. For now we
guarantee it by the `TakeOrderedAndProject` operator. However, this
operator may not be used if the
spark.sql.execution.topKSortFallbackThreshold config has a small value.

Shall we reconsider
https://github.com/apache/spark/commit/5c27b0d4f8d378bd7889d26fb358f478479b9996
? Or we don't expect users to set a small value for
spark.sql.execution.topKSortFallbackThreshold?


On Wed, Sep 5, 2018 at 11:24 AM Chetan Khatri 
wrote:

> Thanks
>
> On Wed 5 Sep, 2018, 2:15 AM Russell Spitzer, 
> wrote:
>
>> RDD: Top
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@top(num:Int)(implicitord:Ordering[T]):Array[T
>> ]
>> Which is pretty much what Sean suggested
>>
>> For Dataframes I think doing a order and limit would be equivalent after
>> optimizations.
>>
>> On Tue, Sep 4, 2018 at 2:28 PM Sean Owen  wrote:
>>
>>> Sort and take head(n)?
>>>
>>> On Tue, Sep 4, 2018 at 12:07 PM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Dear Spark dev, anything equivalent in spark ?

>>>


Re: data source api v2 refactoring

2018-09-04 Thread Wenchen Fan
I'm switching to my another Gmail account, let's see if it still gets
dropped this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the
same.

We still have logical and physical writing. And the table can create
different logical writing based on how to write. e.g., append, delete,
replaceWhere, etc.

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the
API would look like
trait Table {
  WriteConfig newAppendWriteConfig();

  WriteConfig newDeleteWriteConfig(deleteExprs);

  LogicalWrite newLogicalWrite(writeConfig);
}

Without WriteConfig, the API looks like
trait Table {
  LogicalWrite newAppendWrite();

  LogicalWrite newDeleteWrite(deleteExprs);
}


It looks to me that the API is simpler without WriteConfig, what do you
think?

Thanks,
Wenchen

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue  wrote:

> Latest from Wenchen in case it was dropped.
>
> -- Forwarded message -
> From: Wenchen Fan 
> Date: Mon, Sep 3, 2018 at 6:16 AM
> Subject: Re: data source api v2 refactoring
> To: 
> Cc: Ryan Blue , Reynold Xin , <
> dev@spark.apache.org>
>
>
> Hi Mridul,
>
> I'm not sure what's going on, my email was CC'ed to the dev list.
>
>
> Hi Ryan,
>
> The logical and physical scan idea sounds good. To add more color
> to Jungtaek's question, both micro-batch and continuous mode have
> the logical and physical scan, but there is a difference: for micro-batch
> mode, a physical scan outputs data for one epoch, but it's not true for
> continuous mode.
>
> I'm not sure if it's necessary to include streaming epoch in the API
> abstraction, for features like metrics reporting.
>
> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan 
> wrote:
>
>>
>> Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan
>> did :-) )
>> I did not see it in the mail thread I received or in archives ... [1]
>> Wondering which othersenderswere getting dropped (if yes).
>>
>> Regards
>> Mridul
>>
>> [1]
>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>>
>>
>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue 
>> wrote:
>>
>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>
>>> As for the abstraction, here's the way that I think about it: there are
>>> two important parts of a scan: the definition of what will be read, and
>>> task sets that actually perform the read. In batch, there's one definition
>>> of the scan and one task set so it makes sense that there's one scan object
>>> that encapsulates both of these concepts. For streaming, we need to
>>> separate the two into the definition of what will be read (the stream or
>>> streaming read) and the task sets that are run (scans). That way, the
>>> streaming read behaves like a factory for scans, producing scans that
>>> handle the data either in micro-batches or using continuous tasks.
>>>
>>> To address Jungtaek's question, I think that this does work with
>>> continuous. In continuous mode, the query operators keep running and send
>>> data to one another directly. The API still needs a streaming read layer
>>> because it may still produce more than one continuous scan. That would
>>> happen when the underlying source changes and Spark needs to reconfigure. I
>>> think the example here is when partitioning in a Kafka topic changes and
>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>
>>> rb
>>>
>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan 
>>> wrote:
>>>
 Hi Ryan,

 Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
 which is not table/stream/scan, but something between them. The table
 creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
 For streaming source, stream is the one to take care of the pushdown
 result. For batch source, it's the scan.

 It's a little tricky because stream is an abstraction for streaming
 source only. Better ideas are welcome!

>>>
 On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue  wrote:

> Thanks, Reynold!
>
> I think your API sketch looks great. I appreciate having the Table
> level in the abstraction to plug into as well. I think this makes it clear
> what everything does, particularly having the Stream level that represents
> a configured (by ScanConfig) streaming read and can act as a factory for
> individual batch scans or for continuous scans.
>
> Wenchen, I'm not sure what you mean by doing pushdown at the table
> level. It seems to mean that pushdown is specific to a batch scan or
> streaming read, which seems to be what you're saying as well. Wouldn't the
> pushdown happen to create a ScanConfig, which is then used as Reynold
> suggests? Looking forward to seeing this PR when you get it posted. Thanks
> for all of your work on this!
>
> rb
>
> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan 
> wrote:
>
>> Than

Re: Select top (100) percent equivalent in spark

2018-09-04 Thread Chetan Khatri
Thanks

On Wed 5 Sep, 2018, 2:15 AM Russell Spitzer, 
wrote:

> RDD: Top
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@top(num:Int)(implicitord:Ordering[T]):Array[T
> ]
> Which is pretty much what Sean suggested
>
> For Dataframes I think doing a order and limit would be equivalent after
> optimizations.
>
> On Tue, Sep 4, 2018 at 2:28 PM Sean Owen  wrote:
>
>> Sort and take head(n)?
>>
>> On Tue, Sep 4, 2018 at 12:07 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Dear Spark dev, anything equivalent in spark ?
>>>
>>


Re: [DISCUSS] SPIP: APIs for Table Metadata Operations

2018-09-04 Thread Reynold Xin
Ryan, Michael and I discussed this offline today. Some notes here:

His use case is to support partitioning data by derived columns, rather
than physical columns, because he didn't want his users to keep adding the
"date" column when in reality they are purely derived from some timestamp
column. We reached consensus on this is a great use case and something we
should support.

We are still debating how to do this at API level. Two options:

*Option 1.* Create a smaller surfaced, parallel Expression library, and use
that for specifying partition columns. The bare minimum class hierarchy
would look like:

trait Expression

class NamedFunction(name: String, args: Seq[Expression]) extends Expression

class Literal(value: Any) extends Expression

class ColumnReference(name: String) extends Expression

These classes don't define how the expressions are evaluated, and it'd be
up to the data sources to interpret them. As an example, for a table
partitioned by date(ts), Spark would pass the following to the underlying
ds:

NamedFunction("date", ColumnReference("timestamp") :: Nil)


*Option 2.* Spark passes strings over to the data sources. For the above
example, Spark simply passes "date(ts)" as a string over.


The pros/cons of 1 vs 2 are basically the inverse of each other. Option 1
creates more rigid structure, with extra complexity in API design. Option 2
is less structured but more flexible. Option 1 gives Spark the opportunity
to enforce column references are valid (but not the actual function names),
whereas option 2 would be up to the data sources to validate.



On Wed, Aug 15, 2018 at 2:27 PM Ryan Blue  wrote:

> I think I found a good solution to the problem of using Expression in the
> TableCatalog API and in the DeleteSupport API.
>
> For DeleteSupport, there is already a stable and public subset of
> Expression named Filter that can be used to pass filters. The reason why
> DeleteSupport would use Expression is to support more complex expressions
> like to_date(ts) = '2018-08-15' that are translated to ts >=
> 15343164 AND ts < 15344028. But, this can be done in
> Spark instead of the data sources so I think DeleteSupport should use
> Filter instead. I updated the DeleteSupport PR #21308
>  with these changes.
>
> Also, I agree that the DataSourceV2 API should also not expose Expression,
> so I opened SPARK-25127 to track removing SupportsPushDownCatalystFilter
> .
>
> For TableCatalog, I took a similar approach instead of introducing a
> parallel Expression API. Instead, I created a PartitionTransform API (like
> Filter) that communicates the transformation function, function parameters
> like num buckets, and column references. I updated the TableCatalog PR
> #21306  to use
> PartitionTransform instead of Expression and I updated the text of the SPIP
> doc
> 
> .
>
> I also raised a concern about needing to wait for Spark to add support for
> new expressions (now partition transforms). To get around this, I added an
> apply transform that passes the name of a function and an input column.
> That way, users can still pass transforms that Spark doesn’t know about by
> name to data sources: apply("source_function", "colName").
>
> Please have a look at the updated pull requests and SPIP doc and comment!
>
> rb
>


Re: [Feedback Requested] SPARK-25299: Using Distributed Storage for Persisting Shuffle Data

2018-09-04 Thread Matt Cheah
Yuanjian, Thanks for sharing your progress! I was wondering if there was any 
prototype code that we could read to get an idea of what the implementation 
looks like? We can evaluate the design together and also benchmark workloads 
from across the community – that is, we can collect more data from more Spark 
users.

 

The experience would be greatly appreciated in the discussion.

 

-Matt Cheah

 

From: Yuanjian Li 
Date: Friday, August 31, 2018 at 8:29 PM
To: Matt Cheah 
Cc: Spark dev list 
Subject: Re: [Feedback Requested] SPARK-25299: Using Distributed Storage for 
Persisting Shuffle Data

 

Hi Matt, 

 Thanks for the great document and proposal, I want to +1 for the reliable 
shuffle data and give some feedback.

 I think a reliable shuffle service based on DFS is necessary on Spark, 
especially running Spark job over unstable environment. For example, while 
mixed deploying Spark with online service, Spark executor will be killed any 
time. Current stage retry strategy will make the job many times slower than 
normal job.

 Actually we(Baidu inc) solved this problem by stable shuffle service over 
Hadoop, and we are now docking Spark to this shuffle service. The POC work will 
be done at October as expect. We'll post more benchmark and detailed work at 
that time. I'm still reading your discussion document and happy to give more 
feedback in the doc.

 

Thanks,

Yuanjian Li

 

Matt Cheah  于2018年9月1日周六 上午8:42写道:

Hi everyone,

 

I filed SPARK-25299 [issues.apache.org] to promote discussion on how we can 
improve the shuffle operation in Spark. The basic premise is to discuss the 
ways we can leverage distributed storage to improve the reliability and 
isolation of Spark’s shuffle architecture.

 

A few designs and a full problem statement are outlined in this architecture 
discussion document [docs.google.com].

 

This is a complex problem and it would be great to get feedback from the 
community about the right direction to take this work in. Note that we have not 
yet committed to a specific implementation and architecture – there’s a lot 
that needs to be discussed for this improvement, so we hope to get as much 
input as possible before moving forward with a design.

 

Please feel free to leave comments and suggestions on the JIRA ticket or on the 
discussion document.

 

Thank you!

 

-Matt Cheah



smime.p7s
Description: S/MIME cryptographic signature


Re: Select top (100) percent equivalent in spark

2018-09-04 Thread Russell Spitzer
RDD: Top
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@top(num:Int)(implicitord:Ordering[T]):Array[T
]
Which is pretty much what Sean suggested

For Dataframes I think doing a order and limit would be equivalent after
optimizations.

On Tue, Sep 4, 2018 at 2:28 PM Sean Owen  wrote:

> Sort and take head(n)?
>
> On Tue, Sep 4, 2018 at 12:07 PM Chetan Khatri 
> wrote:
>
>> Dear Spark dev, anything equivalent in spark ?
>>
>


Fwd: data source api v2 refactoring

2018-09-04 Thread Ryan Blue
Latest from Wenchen in case it was dropped.

-- Forwarded message -
From: Wenchen Fan 
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: 
Cc: Ryan Blue , Reynold Xin , <
dev@spark.apache.org>


Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color
to Jungtaek's question, both micro-batch and continuous mode have
the logical and physical scan, but there is a difference: for micro-batch
mode, a physical scan outputs data for one epoch, but it's not true for
continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API
abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan 
wrote:

>
> Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan
> did :-) )
> I did not see it in the mail thread I received or in archives ... [1]
> Wondering which othersenderswere getting dropped (if yes).
>
> Regards
> Mridul
>
> [1]
> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>
>
> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue 
> wrote:
>
>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>
>> As for the abstraction, here's the way that I think about it: there are
>> two important parts of a scan: the definition of what will be read, and
>> task sets that actually perform the read. In batch, there's one definition
>> of the scan and one task set so it makes sense that there's one scan object
>> that encapsulates both of these concepts. For streaming, we need to
>> separate the two into the definition of what will be read (the stream or
>> streaming read) and the task sets that are run (scans). That way, the
>> streaming read behaves like a factory for scans, producing scans that
>> handle the data either in micro-batches or using continuous tasks.
>>
>> To address Jungtaek's question, I think that this does work with
>> continuous. In continuous mode, the query operators keep running and send
>> data to one another directly. The API still needs a streaming read layer
>> because it may still produce more than one continuous scan. That would
>> happen when the underlying source changes and Spark needs to reconfigure. I
>> think the example here is when partitioning in a Kafka topic changes and
>> Spark needs to re-map Kafka partitions to continuous tasks.
>>
>> rb
>>
>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan 
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
>>> which is not table/stream/scan, but something between them. The table
>>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
>>> For streaming source, stream is the one to take care of the pushdown
>>> result. For batch source, it's the scan.
>>>
>>> It's a little tricky because stream is an abstraction for streaming
>>> source only. Better ideas are welcome!
>>>
>>
>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue  wrote:
>>>
 Thanks, Reynold!

 I think your API sketch looks great. I appreciate having the Table
 level in the abstraction to plug into as well. I think this makes it clear
 what everything does, particularly having the Stream level that represents
 a configured (by ScanConfig) streaming read and can act as a factory for
 individual batch scans or for continuous scans.

 Wenchen, I'm not sure what you mean by doing pushdown at the table
 level. It seems to mean that pushdown is specific to a batch scan or
 streaming read, which seems to be what you're saying as well. Wouldn't the
 pushdown happen to create a ScanConfig, which is then used as Reynold
 suggests? Looking forward to seeing this PR when you get it posted. Thanks
 for all of your work on this!

 rb

 On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan 
 wrote:

> Thank Reynold for writing this and starting the discussion!
>
> Data source v2 was started with batch only, so we didn't pay much
> attention to the abstraction and just follow the v1 API. Now we are
> designing the streaming API and catalog integration, the abstraction
> becomes super important.
>
> I like this proposed abstraction and have successfully prototyped it
> to make sure it works.
>
> During prototyping, I have to work around the issue that the current
> streaming engine does query optimization/planning for each micro batch.
> With this abstraction, the operator pushdown is only applied once
> per-query. In my prototype, I do the physical planning up front to get the
> pushdown result, and
> add a logical linking node that wraps the resulting physical plan node
> for the data source, and then swap that logical linking node into the
> logical plan for each batch. In the future we should just let the 
> 

Re: Select top (100) percent equivalent in spark

2018-09-04 Thread Sean Owen
Sort and take head(n)?

On Tue, Sep 4, 2018 at 12:07 PM Chetan Khatri 
wrote:

> Dear Spark dev, anything equivalent in spark ?
>


Select top (100) percent equivalent in spark

2018-09-04 Thread Chetan Khatri
Dear Spark dev, anything equivalent in spark ?


[ML] Setting Non-Transform Params for a Pipeline & PipelineModel

2018-09-04 Thread Aleksander Eskilson
In a nutshell, what I'd like to do is persist a instantiate a Pipeline (or
extension class of Pipeline) with metadata that is copied to the
PipelineModel when fitted, and can be read again when the fitted model is
loaded by another consumer. These params are specific to the PipelineModel
more than any particular Transform or the Estimator declared as part of the
Pipeline, where the intent is that the PipelineModel params can be read by
a downstream consumer of the loaded model, but the value that the params
should take will only be known to the creator the of Pipeline/trainer of
the PipelineModel.

It seems that Pipeline and PipelineModel support the Params interface, like
Transform and Estimator do. It seems I can extend Pipeline to a custom
class MyPipeline, where the constructor could enforce that my metadata
Params are set. However, when the Pipeline is *fit*, the resultant
PipelineModel doesn't seem to include the original CustomPipeline's params,
only params from the individual Transform steps.

>From a read of the code, it seems that the *fit* method will copy over the
Stages to the PipelineModel, and those will be persisted (along with the
Stages' Params) during *write*, *but* any Params belonging to the Pipeline
are not copied to the PipelineModel (as only Stages are considered during
copy, not the ParamMap of the Pipeline) [1].

Is this a correct read of the flow here? That a CustomPipeline extension of
Pipeline with member Params does not get those non-Transform Params copied
into the fitted PipelineMode?

If so, would a feature enhancement including Pipeline-specific Params being
copyable into the fitted PipelineModel be considered acceptable?

Or should there be another way to include metadata *about* the Pipeline
such that the metadata is copyable to the fitted PipelineModel, and able to
be persisted with PipelineModel *write* and read again with PipelineModel
*load*? My first attempt at this has been to extend the Pipeline class
itself with member params, but this doesn't seem to do the trick given how
Params are actually copied only for Stages between Pipeline and the fitted
PipelineModel.

It occurs to me I could write a custom *withMetadata* transform Stage which
would really just an identity function but with the desired Params built
in, and that those Params would get copied with the other Stages, but as
discussed at the top, this particular use-case for metadata isn't about any
particular Transform, but more about metadata for the whole Pipeline.

Alek

[1] --
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala#L135


Re: data source api v2 refactoring

2018-09-04 Thread Marcelo Vanzin
Same here, I don't see anything from Wenchen... just replies to him.
On Sat, Sep 1, 2018 at 9:31 PM Mridul Muralidharan  wrote:
>
>
> Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did 
> :-) )
> I did not see it in the mail thread I received or in archives ... [1] 
> Wondering which othersenderswere getting dropped (if yes).
>
> Regards
> Mridul
>
> [1] 
> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>
>
> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue  wrote:
>>
>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>
>> As for the abstraction, here's the way that I think about it: there are two 
>> important parts of a scan: the definition of what will be read, and task 
>> sets that actually perform the read. In batch, there's one definition of the 
>> scan and one task set so it makes sense that there's one scan object that 
>> encapsulates both of these concepts. For streaming, we need to separate the 
>> two into the definition of what will be read (the stream or streaming read) 
>> and the task sets that are run (scans). That way, the streaming read behaves 
>> like a factory for scans, producing scans that handle the data either in 
>> micro-batches or using continuous tasks.
>>
>> To address Jungtaek's question, I think that this does work with continuous. 
>> In continuous mode, the query operators keep running and send data to one 
>> another directly. The API still needs a streaming read layer because it may 
>> still produce more than one continuous scan. That would happen when the 
>> underlying source changes and Spark needs to reconfigure. I think the 
>> example here is when partitioning in a Kafka topic changes and Spark needs 
>> to re-map Kafka partitions to continuous tasks.
>>
>> rb
>>
>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan  wrote:
>>>
>>> Hi Ryan,
>>>
>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig, 
>>> which is not table/stream/scan, but something between them. The table 
>>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. 
>>> For streaming source, stream is the one to take care of the pushdown 
>>> result. For batch source, it's the scan.
>>>
>>> It's a little tricky because stream is an abstraction for streaming source 
>>> only. Better ideas are welcome!
>>>
>>>
>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue  wrote:

 Thanks, Reynold!

 I think your API sketch looks great. I appreciate having the Table level 
 in the abstraction to plug into as well. I think this makes it clear what 
 everything does, particularly having the Stream level that represents a 
 configured (by ScanConfig) streaming read and can act as a factory for 
 individual batch scans or for continuous scans.

 Wenchen, I'm not sure what you mean by doing pushdown at the table level. 
 It seems to mean that pushdown is specific to a batch scan or streaming 
 read, which seems to be what you're saying as well. Wouldn't the pushdown 
 happen to create a ScanConfig, which is then used as Reynold suggests? 
 Looking forward to seeing this PR when you get it posted. Thanks for all 
 of your work on this!

 rb

 On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan  wrote:
>
> Thank Reynold for writing this and starting the discussion!
>
> Data source v2 was started with batch only, so we didn't pay much 
> attention to the abstraction and just follow the v1 API. Now we are 
> designing the streaming API and catalog integration, the abstraction 
> becomes super important.
>
> I like this proposed abstraction and have successfully prototyped it to 
> make sure it works.
>
> During prototyping, I have to work around the issue that the current 
> streaming engine does query optimization/planning for each micro batch. 
> With this abstraction, the operator pushdown is only applied once 
> per-query. In my prototype, I do the physical planning up front to get 
> the pushdown result, and
> add a logical linking node that wraps the resulting physical plan node 
> for the data source, and then swap that logical linking node into the 
> logical plan for each batch. In the future we should just let the 
> streaming engine do query optimization/planning only once.
>
> About pushdown, I think we should do it at the table level. The table 
> should create a new pushdow handler to apply operator pushdowm for each 
> scan/stream, and create the scan/stream with the pushdown result. The 
> rationale is, a table should have the same pushdown behavior regardless 
> the scan node.
>
> Thanks,
> Wenchen
>
>
>
>
>
> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin  wrote:
>>
>> I spent some time last week looking at the current data source v2 apis, 
>> and I thought we should be a 

Re: Nightly Builds in the docs (in spark-nightly/spark-master-bin/latest? Can't seem to find it)

2018-09-04 Thread shane knapp
the docs and publishing builds need some attention...  i was planning on
looking in to this after the 2.4 cut and ubuntu port is a little further
along.

see:
https://amplab.cs.berkeley.edu/jenkins/label/spark-docs/
https://amplab.cs.berkeley.edu/jenkins/label/spark-packaging/
https://amplab.cs.berkeley.edu/jenkins/label/spark-release/

these are all tied to amp-jenkins-worker-01, and use some ruby packages
that i haven't recreated yet on the ubuntu worker(s).  the build configs
are...  scary.  and fragile.  and not set up by myself.

shane

On Fri, Aug 31, 2018 at 1:37 PM, Marcelo Vanzin  wrote:

> I think there still might be an active job publishing stuff. Here's a
> pretty recent build from master:
>
> https://dist.apache.org/repos/dist/dev/spark/2.4.0-SNAPSHOT-
> 2018_08_31_12_02-32da87d-docs/_site/index.html
>
> But it seems only docs are being published, which makes me think it's
> those builds that Shane mentioned in a recent e-mail.
>
> On Fri, Aug 31, 2018 at 1:29 PM Sean Owen  wrote:
> >
> > There are some builds there, but they're not recent:
> >
> > https://people.apache.org/~pwendell/spark-nightly/
> >
> > We can either get the jobs running again, or just knock this on the head
> and remove it.
> >
> > Anyone know how to get it running again and want to? I have a feeling
> Shane knows if anyone. Or does anyone know if we even need these at this
> point? if nobody has complained in about a year, unlikely.
> >
> > On Fri, Aug 31, 2018 at 3:15 PM Cody Koeninger 
> wrote:
> >>
> >> Just got a question about this on the user list as well.
> >>
> >> Worth removing that link to pwendell's directory from the docs?
> >>
> >> On Sun, Jan 21, 2018 at 12:13 PM, Jacek Laskowski 
> wrote:
> >> > Hi,
> >> >
> >> > http://spark.apache.org/developer-tools.html#nightly-builds reads:
> >> >
> >> >> Spark nightly packages are available at:
> >> >> Latest master build:
> >> >> https://people.apache.org/~pwendell/spark-nightly/spark-
> master-bin/latest
> >> >
> >> > but the URL gives 404. Is this intended?
> >> >
> >> > Pozdrawiam,
> >> > Jacek Laskowski
> >> > 
> >> > https://about.me/JacekLaskowski
> >> > Mastering Spark SQL https://bit.ly/mastering-spark-sql
> >> > Spark Structured Streaming https://bit.ly/spark-structured-streaming
> >> > Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> >> > Follow me at https://twitter.com/jaceklaskowski
> >>
> >> -
> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu


Re: Jenkins automatic disabling service - who and why?

2018-09-04 Thread shane knapp
fyi, i haven't upgraded jenkins in a couple of years...  (yeah, i know...
it's on my todo list).

i'm just assuming that it's an artifact of old PRs going 'stale' somehow,
but since that's not mentioned anywhere in the plugin docs i wouldn't bet
good money on that.  :)

On Mon, Sep 3, 2018 at 11:02 AM, Felix Cheung 
wrote:

> I think Jenkins asked again after there was an upgrade - seems to happen
> when the PR was opened for a few months.
>
>
> --
> *From:* Hyukjin Kwon 
> *Sent:* Monday, September 3, 2018 7:05 AM
> *To:* Sean Owen
> *Cc:* dev
> *Subject:* Re: Jenkins automatic disabling service - who and why?
>
> > Looks like for some reason Jenkins first asked for admin approval well
> after the discussion started.
>
> Oh, for the specific PR, Felix approved a Jenkins test. Jenkins asked this
> before. Somehow after few weeks(?), Jenkins asked it again. It happened
> globally in multiple PRs.
> It's not shown because when we type "ok to test", the Jenkins asking is
> gone away.
>
> 2018년 9월 3일 (월) 오후 8:54, Hyukjin Kwon 님이 작성:
>
>> Not a big deal but it has been few months since I saw this, and wondering
>> why it suddenly asks Jenkins admin verification from at certain point.
>>
>> I had a small argument about pinging stuff related with this and I failed
>> to give the reasons. So I was simply wondering why and who.
>>
>>
>>
>> On Mon, 3 Sep 2018, 8:04 pm Sean Owen,  wrote:
>>
>>> I'm not sure if anything changed. What is the particular issue here?
>>> Looks like for some reason Jenkins first asked for admin approval well
>>> after the discussion started. Nobody asked for a test after that. Can you
>>> not trigger it from the web app UI?
>>>
>>> On Mon, Sep 3, 2018, 1:54 AM Hyukjin Kwon  wrote:
>>>
 Hi all,

 I lately noticed we started to block Jenkins tests in old PRs. For
 instance, see https://github.com/apache/spark/pull/18447
 I don't explicitly object this idea but at least can I ask who and why
 this was started?
 Is it for notification purpose or to save resource? Did I miss some
 discussion about this?




-- 
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu


Re: Spark JIRA tags clarification and management

2018-09-04 Thread Kazuaki Ishizaki
Of course, we would like to eliminate all of the following tags

"flanky" or "flankytest"

Kazuaki Ishizaki



From:   Hyukjin Kwon 
To: dev 
Cc: Xiao Li , Wenchen Fan 
Date:   2018/09/04 14:20
Subject:Re: Spark JIRA tags clarification and management



Thanks, Reynold.

+Adding Xiao and Wenchen who I saw often used tags.

Would you have some tags you think we should document more?

2018년 9월 4일 (화) 오전 9:27, Reynold Xin 님이 작성:
The most common ones we do are:

releasenotes

correctness



On Mon, Sep 3, 2018 at 6:23 PM Hyukjin Kwon  wrote:
Thanks, Felix and Reynold. Would you guys mind if I ask this to anyone who 
use the tags frequently? Frankly, I don't use the tags often ..

2018년 9월 4일 (화) 오전 2:04, Felix Cheung 님
이 작성:
+1 good idea.
There are a few for organizing but some also are critical to the release 
process, like rel note. Would be good to clarify.


From: Reynold Xin 
Sent: Sunday, September 2, 2018 11:50 PM
To: Hyukjin Kwon
Cc: dev
Subject: Re: Spark JIRA tags clarification and management 
 
It would be great to document the common ones.

On Sun, Sep 2, 2018 at 11:49 PM Hyukjin Kwon  wrote:
Hi all, 

I lately noticed tags are often used to classify JIRAs. I was thinking we 
better explicitly document what tags are used and explain which tag means 
what. For instance, we documented "Contributing to JIRA Maintenance" at 
https://spark.apache.org/contributing.html before (thanks, Sean Owen) - 
this helps me a lot to managing JIRAs, and they are good standards for, at 
least, me to take an action.

It doesn't necessarily mean we should clarify everything but it might be 
good to document tags used often.

We can leave this for committer's scope as well, if that's preferred - I 
don't have a strong opinion on this. My point is, can we clarify this in 
the contributing guide so that we can reduce the maintenance cost?